1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <cstdint> |
19 | #include <cstdlib> |
20 | #include <cstring> |
21 | #include <memory> |
22 | #include <string> |
23 | |
24 | #include <gtest/gtest.h> |
25 | |
26 | #include "arrow/buffer.h" |
27 | #include "arrow/io/interfaces.h" |
28 | #include "arrow/io/memory.h" |
29 | #include "arrow/status.h" |
30 | #include "arrow/test-util.h" |
31 | #include "arrow/util/checked_cast.h" |
32 | |
33 | namespace arrow { |
34 | |
35 | using internal::checked_cast; |
36 | |
37 | namespace io { |
38 | |
39 | class TestBufferOutputStream : public ::testing::Test { |
40 | public: |
41 | void SetUp() { |
42 | ASSERT_OK(AllocateResizableBuffer(0, &buffer_)); |
43 | stream_.reset(new BufferOutputStream(buffer_)); |
44 | } |
45 | |
46 | protected: |
47 | std::shared_ptr<ResizableBuffer> buffer_; |
48 | std::unique_ptr<OutputStream> stream_; |
49 | }; |
50 | |
51 | TEST_F(TestBufferOutputStream, DtorCloses) { |
52 | std::string data = "data123456" ; |
53 | |
54 | const int K = 100; |
55 | for (int i = 0; i < K; ++i) { |
56 | EXPECT_OK(stream_->Write(data)); |
57 | } |
58 | |
59 | stream_ = nullptr; |
60 | ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size()); |
61 | } |
62 | |
63 | TEST_F(TestBufferOutputStream, CloseResizes) { |
64 | std::string data = "data123456" ; |
65 | |
66 | const int K = 100; |
67 | for (int i = 0; i < K; ++i) { |
68 | EXPECT_OK(stream_->Write(data)); |
69 | } |
70 | |
71 | ASSERT_OK(stream_->Close()); |
72 | ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size()); |
73 | } |
74 | |
75 | TEST_F(TestBufferOutputStream, WriteAfterFinish) { |
76 | std::string data = "data123456" ; |
77 | ASSERT_OK(stream_->Write(data)); |
78 | |
79 | auto buffer_stream = checked_cast<BufferOutputStream*>(stream_.get()); |
80 | |
81 | std::shared_ptr<Buffer> buffer; |
82 | ASSERT_OK(buffer_stream->Finish(&buffer)); |
83 | |
84 | ASSERT_RAISES(IOError, stream_->Write(data)); |
85 | } |
86 | |
87 | TEST_F(TestBufferOutputStream, Reset) { |
88 | std::string data = "data123456" ; |
89 | |
90 | auto stream = checked_cast<BufferOutputStream*>(stream_.get()); |
91 | |
92 | ASSERT_OK(stream->Write(data)); |
93 | |
94 | std::shared_ptr<Buffer> buffer; |
95 | ASSERT_OK(stream->Finish(&buffer)); |
96 | ASSERT_EQ(buffer->size(), static_cast<int64_t>(data.size())); |
97 | |
98 | ASSERT_OK(stream->Reset(2048)); |
99 | ASSERT_OK(stream->Write(data)); |
100 | ASSERT_OK(stream->Write(data)); |
101 | std::shared_ptr<Buffer> buffer2; |
102 | ASSERT_OK(stream->Finish(&buffer2)); |
103 | |
104 | ASSERT_EQ(buffer2->size(), static_cast<int64_t>(data.size() * 2)); |
105 | } |
106 | |
107 | TEST(TestFixedSizeBufferWriter, Basics) { |
108 | std::shared_ptr<Buffer> buffer; |
109 | ASSERT_OK(AllocateBuffer(1024, &buffer)); |
110 | |
111 | FixedSizeBufferWriter writer(buffer); |
112 | |
113 | int64_t position; |
114 | ASSERT_OK(writer.Tell(&position)); |
115 | ASSERT_EQ(0, position); |
116 | |
117 | std::string data = "data123456" ; |
118 | auto nbytes = static_cast<int64_t>(data.size()); |
119 | ASSERT_OK(writer.Write(data.c_str(), nbytes)); |
120 | |
121 | ASSERT_OK(writer.Tell(&position)); |
122 | ASSERT_EQ(nbytes, position); |
123 | |
124 | ASSERT_OK(writer.Seek(4)); |
125 | ASSERT_OK(writer.Tell(&position)); |
126 | ASSERT_EQ(4, position); |
127 | |
128 | ASSERT_OK(writer.Seek(1024)); |
129 | ASSERT_OK(writer.Tell(&position)); |
130 | ASSERT_EQ(1024, position); |
131 | |
132 | // Write out of bounds |
133 | ASSERT_RAISES(IOError, writer.Write(data.c_str(), 1)); |
134 | |
135 | // Seek out of bounds |
136 | ASSERT_RAISES(IOError, writer.Seek(-1)); |
137 | ASSERT_RAISES(IOError, writer.Seek(1025)); |
138 | |
139 | ASSERT_OK(writer.Close()); |
140 | } |
141 | |
142 | TEST(TestBufferReader, FromStrings) { |
143 | // ARROW-3291: construct BufferReader from std::string or |
144 | // arrow::util::string_view |
145 | |
146 | std::string data = "data123456" ; |
147 | auto view = util::string_view(data); |
148 | |
149 | BufferReader reader1(data); |
150 | BufferReader reader2(view); |
151 | |
152 | std::shared_ptr<Buffer> piece; |
153 | ASSERT_OK(reader1.Read(4, &piece)); |
154 | ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4)); |
155 | |
156 | ASSERT_OK(reader2.Seek(2)); |
157 | ASSERT_OK(reader2.Read(4, &piece)); |
158 | ASSERT_EQ(0, memcmp(piece->data(), data.data() + 2, 4)); |
159 | } |
160 | |
161 | TEST(TestBufferReader, Seeking) { |
162 | std::string data = "data123456" ; |
163 | |
164 | BufferReader reader(data); |
165 | int64_t pos; |
166 | ASSERT_OK(reader.Tell(&pos)); |
167 | ASSERT_EQ(pos, 0); |
168 | |
169 | ASSERT_OK(reader.Seek(9)); |
170 | ASSERT_OK(reader.Tell(&pos)); |
171 | ASSERT_EQ(pos, 9); |
172 | |
173 | ASSERT_OK(reader.Seek(10)); |
174 | ASSERT_OK(reader.Tell(&pos)); |
175 | ASSERT_EQ(pos, 10); |
176 | |
177 | ASSERT_RAISES(IOError, reader.Seek(11)); |
178 | ASSERT_OK(reader.Tell(&pos)); |
179 | ASSERT_EQ(pos, 10); |
180 | } |
181 | |
182 | TEST(TestBufferReader, Peek) { |
183 | std::string data = "data123456" ; |
184 | |
185 | BufferReader reader(std::make_shared<Buffer>(data)); |
186 | |
187 | auto view = reader.Peek(4); |
188 | |
189 | ASSERT_EQ(4, view.size()); |
190 | ASSERT_EQ(data.substr(0, 4), view.to_string()); |
191 | |
192 | view = reader.Peek(20); |
193 | ASSERT_EQ(data.size(), view.size()); |
194 | ASSERT_EQ(data, view.to_string()); |
195 | } |
196 | |
197 | TEST(TestBufferReader, RetainParentReference) { |
198 | // ARROW-387 |
199 | std::string data = "data123456" ; |
200 | |
201 | std::shared_ptr<Buffer> slice1; |
202 | std::shared_ptr<Buffer> slice2; |
203 | { |
204 | std::shared_ptr<Buffer> buffer; |
205 | ASSERT_OK(AllocateBuffer(nullptr, static_cast<int64_t>(data.size()), &buffer)); |
206 | std::memcpy(buffer->mutable_data(), data.c_str(), data.size()); |
207 | BufferReader reader(buffer); |
208 | ASSERT_OK(reader.Read(4, &slice1)); |
209 | ASSERT_OK(reader.Read(6, &slice2)); |
210 | } |
211 | |
212 | ASSERT_TRUE(slice1->parent() != nullptr); |
213 | |
214 | ASSERT_EQ(0, std::memcmp(slice1->data(), data.c_str(), 4)); |
215 | ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6)); |
216 | } |
217 | |
218 | TEST(TestMemcopy, ParallelMemcopy) { |
219 | #if defined(ARROW_VALGRIND) |
220 | // Compensate for Valgrind's slowness |
221 | constexpr int64_t THRESHOLD = 32 * 1024; |
222 | #else |
223 | constexpr int64_t THRESHOLD = 1024 * 1024; |
224 | #endif |
225 | |
226 | for (int i = 0; i < 5; ++i) { |
227 | // randomize size so the memcopy alignment is tested |
228 | int64_t total_size = 3 * THRESHOLD + std::rand() % 100; |
229 | |
230 | std::shared_ptr<Buffer> buffer1, buffer2; |
231 | |
232 | ASSERT_OK(AllocateBuffer(total_size, &buffer1)); |
233 | ASSERT_OK(AllocateBuffer(total_size, &buffer2)); |
234 | |
235 | random_bytes(total_size, 0, buffer2->mutable_data()); |
236 | |
237 | io::FixedSizeBufferWriter writer(buffer1); |
238 | writer.set_memcopy_threads(4); |
239 | writer.set_memcopy_threshold(THRESHOLD); |
240 | ASSERT_OK(writer.Write(buffer2->data(), buffer2->size())); |
241 | |
242 | ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size())); |
243 | } |
244 | } |
245 | |
246 | } // namespace io |
247 | } // namespace arrow |
248 | |