1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <cstdint>
19#include <cstdlib>
20#include <cstring>
21#include <memory>
22#include <string>
23
24#include <gtest/gtest.h>
25
26#include "arrow/buffer.h"
27#include "arrow/io/interfaces.h"
28#include "arrow/io/memory.h"
29#include "arrow/status.h"
30#include "arrow/test-util.h"
31#include "arrow/util/checked_cast.h"
32
33namespace arrow {
34
35using internal::checked_cast;
36
37namespace io {
38
39class TestBufferOutputStream : public ::testing::Test {
40 public:
41 void SetUp() {
42 ASSERT_OK(AllocateResizableBuffer(0, &buffer_));
43 stream_.reset(new BufferOutputStream(buffer_));
44 }
45
46 protected:
47 std::shared_ptr<ResizableBuffer> buffer_;
48 std::unique_ptr<OutputStream> stream_;
49};
50
51TEST_F(TestBufferOutputStream, DtorCloses) {
52 std::string data = "data123456";
53
54 const int K = 100;
55 for (int i = 0; i < K; ++i) {
56 EXPECT_OK(stream_->Write(data));
57 }
58
59 stream_ = nullptr;
60 ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size());
61}
62
63TEST_F(TestBufferOutputStream, CloseResizes) {
64 std::string data = "data123456";
65
66 const int K = 100;
67 for (int i = 0; i < K; ++i) {
68 EXPECT_OK(stream_->Write(data));
69 }
70
71 ASSERT_OK(stream_->Close());
72 ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size());
73}
74
75TEST_F(TestBufferOutputStream, WriteAfterFinish) {
76 std::string data = "data123456";
77 ASSERT_OK(stream_->Write(data));
78
79 auto buffer_stream = checked_cast<BufferOutputStream*>(stream_.get());
80
81 std::shared_ptr<Buffer> buffer;
82 ASSERT_OK(buffer_stream->Finish(&buffer));
83
84 ASSERT_RAISES(IOError, stream_->Write(data));
85}
86
87TEST_F(TestBufferOutputStream, Reset) {
88 std::string data = "data123456";
89
90 auto stream = checked_cast<BufferOutputStream*>(stream_.get());
91
92 ASSERT_OK(stream->Write(data));
93
94 std::shared_ptr<Buffer> buffer;
95 ASSERT_OK(stream->Finish(&buffer));
96 ASSERT_EQ(buffer->size(), static_cast<int64_t>(data.size()));
97
98 ASSERT_OK(stream->Reset(2048));
99 ASSERT_OK(stream->Write(data));
100 ASSERT_OK(stream->Write(data));
101 std::shared_ptr<Buffer> buffer2;
102 ASSERT_OK(stream->Finish(&buffer2));
103
104 ASSERT_EQ(buffer2->size(), static_cast<int64_t>(data.size() * 2));
105}
106
107TEST(TestFixedSizeBufferWriter, Basics) {
108 std::shared_ptr<Buffer> buffer;
109 ASSERT_OK(AllocateBuffer(1024, &buffer));
110
111 FixedSizeBufferWriter writer(buffer);
112
113 int64_t position;
114 ASSERT_OK(writer.Tell(&position));
115 ASSERT_EQ(0, position);
116
117 std::string data = "data123456";
118 auto nbytes = static_cast<int64_t>(data.size());
119 ASSERT_OK(writer.Write(data.c_str(), nbytes));
120
121 ASSERT_OK(writer.Tell(&position));
122 ASSERT_EQ(nbytes, position);
123
124 ASSERT_OK(writer.Seek(4));
125 ASSERT_OK(writer.Tell(&position));
126 ASSERT_EQ(4, position);
127
128 ASSERT_OK(writer.Seek(1024));
129 ASSERT_OK(writer.Tell(&position));
130 ASSERT_EQ(1024, position);
131
132 // Write out of bounds
133 ASSERT_RAISES(IOError, writer.Write(data.c_str(), 1));
134
135 // Seek out of bounds
136 ASSERT_RAISES(IOError, writer.Seek(-1));
137 ASSERT_RAISES(IOError, writer.Seek(1025));
138
139 ASSERT_OK(writer.Close());
140}
141
142TEST(TestBufferReader, FromStrings) {
143 // ARROW-3291: construct BufferReader from std::string or
144 // arrow::util::string_view
145
146 std::string data = "data123456";
147 auto view = util::string_view(data);
148
149 BufferReader reader1(data);
150 BufferReader reader2(view);
151
152 std::shared_ptr<Buffer> piece;
153 ASSERT_OK(reader1.Read(4, &piece));
154 ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4));
155
156 ASSERT_OK(reader2.Seek(2));
157 ASSERT_OK(reader2.Read(4, &piece));
158 ASSERT_EQ(0, memcmp(piece->data(), data.data() + 2, 4));
159}
160
161TEST(TestBufferReader, Seeking) {
162 std::string data = "data123456";
163
164 BufferReader reader(data);
165 int64_t pos;
166 ASSERT_OK(reader.Tell(&pos));
167 ASSERT_EQ(pos, 0);
168
169 ASSERT_OK(reader.Seek(9));
170 ASSERT_OK(reader.Tell(&pos));
171 ASSERT_EQ(pos, 9);
172
173 ASSERT_OK(reader.Seek(10));
174 ASSERT_OK(reader.Tell(&pos));
175 ASSERT_EQ(pos, 10);
176
177 ASSERT_RAISES(IOError, reader.Seek(11));
178 ASSERT_OK(reader.Tell(&pos));
179 ASSERT_EQ(pos, 10);
180}
181
182TEST(TestBufferReader, Peek) {
183 std::string data = "data123456";
184
185 BufferReader reader(std::make_shared<Buffer>(data));
186
187 auto view = reader.Peek(4);
188
189 ASSERT_EQ(4, view.size());
190 ASSERT_EQ(data.substr(0, 4), view.to_string());
191
192 view = reader.Peek(20);
193 ASSERT_EQ(data.size(), view.size());
194 ASSERT_EQ(data, view.to_string());
195}
196
197TEST(TestBufferReader, RetainParentReference) {
198 // ARROW-387
199 std::string data = "data123456";
200
201 std::shared_ptr<Buffer> slice1;
202 std::shared_ptr<Buffer> slice2;
203 {
204 std::shared_ptr<Buffer> buffer;
205 ASSERT_OK(AllocateBuffer(nullptr, static_cast<int64_t>(data.size()), &buffer));
206 std::memcpy(buffer->mutable_data(), data.c_str(), data.size());
207 BufferReader reader(buffer);
208 ASSERT_OK(reader.Read(4, &slice1));
209 ASSERT_OK(reader.Read(6, &slice2));
210 }
211
212 ASSERT_TRUE(slice1->parent() != nullptr);
213
214 ASSERT_EQ(0, std::memcmp(slice1->data(), data.c_str(), 4));
215 ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
216}
217
218TEST(TestMemcopy, ParallelMemcopy) {
219#if defined(ARROW_VALGRIND)
220 // Compensate for Valgrind's slowness
221 constexpr int64_t THRESHOLD = 32 * 1024;
222#else
223 constexpr int64_t THRESHOLD = 1024 * 1024;
224#endif
225
226 for (int i = 0; i < 5; ++i) {
227 // randomize size so the memcopy alignment is tested
228 int64_t total_size = 3 * THRESHOLD + std::rand() % 100;
229
230 std::shared_ptr<Buffer> buffer1, buffer2;
231
232 ASSERT_OK(AllocateBuffer(total_size, &buffer1));
233 ASSERT_OK(AllocateBuffer(total_size, &buffer2));
234
235 random_bytes(total_size, 0, buffer2->mutable_data());
236
237 io::FixedSizeBufferWriter writer(buffer1);
238 writer.set_memcopy_threads(4);
239 writer.set_memcopy_threshold(THRESHOLD);
240 ASSERT_OK(writer.Write(buffer2->data(), buffer2->size()));
241
242 ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size()));
243 }
244}
245
246} // namespace io
247} // namespace arrow
248