1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <atomic> |
19 | #include <cstdint> |
20 | #include <cstdlib> |
21 | #include <cstring> |
22 | #include <iostream> |
23 | #include <memory> |
24 | #include <sstream> // IWYU pragma: keep |
25 | #include <string> |
26 | #include <thread> |
27 | #include <vector> |
28 | |
29 | #include <gtest/gtest.h> |
30 | |
31 | #include <boost/filesystem.hpp> // NOLINT |
32 | |
33 | #include "arrow/buffer.h" |
34 | #include "arrow/io/hdfs-internal.h" |
35 | #include "arrow/io/hdfs.h" |
36 | #include "arrow/io/interfaces.h" |
37 | #include "arrow/status.h" |
38 | #include "arrow/test-util.h" |
39 | |
40 | namespace arrow { |
41 | namespace io { |
42 | |
43 | std::vector<uint8_t> RandomData(int64_t size) { |
44 | std::vector<uint8_t> buffer(size); |
45 | random_bytes(size, 0, buffer.data()); |
46 | return buffer; |
47 | } |
48 | |
49 | struct JNIDriver { |
50 | static HdfsDriver type; |
51 | }; |
52 | |
53 | struct PivotalDriver { |
54 | static HdfsDriver type; |
55 | }; |
56 | |
57 | template <typename DRIVER> |
58 | class TestHadoopFileSystem : public ::testing::Test { |
59 | public: |
60 | Status MakeScratchDir() { |
61 | if (client_->Exists(scratch_dir_)) { |
62 | RETURN_NOT_OK((client_->Delete(scratch_dir_, true))); |
63 | } |
64 | return client_->MakeDirectory(scratch_dir_); |
65 | } |
66 | |
67 | Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, |
68 | bool append = false, int buffer_size = 0, int16_t replication = 0, |
69 | int default_block_size = 0) { |
70 | std::shared_ptr<HdfsOutputStream> file; |
71 | RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication, |
72 | default_block_size, &file)); |
73 | |
74 | RETURN_NOT_OK(file->Write(buffer, size)); |
75 | RETURN_NOT_OK(file->Close()); |
76 | |
77 | return Status::OK(); |
78 | } |
79 | |
80 | std::string ScratchPath(const std::string& name) { |
81 | std::stringstream ss; |
82 | ss << scratch_dir_ << "/" << name; |
83 | return ss.str(); |
84 | } |
85 | |
86 | std::string HdfsAbsPath(const std::string& relpath) { |
87 | std::stringstream ss; |
88 | ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath; |
89 | return ss.str(); |
90 | } |
91 | |
92 | // Set up shared state between unit tests |
93 | void SetUp() { |
94 | internal::LibHdfsShim* driver_shim; |
95 | |
96 | client_ = nullptr; |
97 | scratch_dir_ = |
98 | boost::filesystem::unique_path(boost::filesystem::temp_directory_path() / |
99 | "arrow-hdfs/scratch-%%%%" ) |
100 | .string(); |
101 | |
102 | loaded_driver_ = false; |
103 | |
104 | Status msg; |
105 | |
106 | if (DRIVER::type == HdfsDriver::LIBHDFS) { |
107 | msg = ConnectLibHdfs(&driver_shim); |
108 | if (!msg.ok()) { |
109 | std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; |
110 | return; |
111 | } |
112 | } else { |
113 | msg = ConnectLibHdfs3(&driver_shim); |
114 | if (!msg.ok()) { |
115 | std::cout << "Loading libhdfs3 failed, skipping tests gracefully. " |
116 | << msg.ToString() << std::endl; |
117 | return; |
118 | } |
119 | } |
120 | |
121 | loaded_driver_ = true; |
122 | |
123 | const char* host = std::getenv("ARROW_HDFS_TEST_HOST" ); |
124 | const char* port = std::getenv("ARROW_HDFS_TEST_PORT" ); |
125 | const char* user = std::getenv("ARROW_HDFS_TEST_USER" ); |
126 | |
127 | ASSERT_TRUE(user != nullptr) << "Set ARROW_HDFS_TEST_USER" ; |
128 | |
129 | conf_.host = host == nullptr ? "localhost" : host; |
130 | conf_.user = user; |
131 | conf_.port = port == nullptr ? 20500 : atoi(port); |
132 | conf_.driver = DRIVER::type; |
133 | |
134 | ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_)); |
135 | } |
136 | |
137 | void TearDown() { |
138 | if (client_) { |
139 | if (client_->Exists(scratch_dir_)) { |
140 | EXPECT_OK(client_->Delete(scratch_dir_, true)); |
141 | } |
142 | EXPECT_OK(client_->Disconnect()); |
143 | } |
144 | } |
145 | |
146 | HdfsConnectionConfig conf_; |
147 | bool loaded_driver_; |
148 | |
149 | // Resources shared amongst unit tests |
150 | std::string scratch_dir_; |
151 | std::shared_ptr<HadoopFileSystem> client_; |
152 | }; |
153 | |
154 | template <> |
155 | std::string TestHadoopFileSystem<PivotalDriver>::HdfsAbsPath(const std::string& relpath) { |
156 | std::stringstream ss; |
157 | ss << relpath; |
158 | return ss.str(); |
159 | } |
160 | |
161 | #define SKIP_IF_NO_DRIVER() \ |
162 | if (!this->loaded_driver_) { \ |
163 | std::cout << "Driver not loaded, skipping" << std::endl; \ |
164 | return; \ |
165 | } |
166 | |
167 | HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; |
168 | HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; |
169 | |
170 | typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes; |
171 | TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); |
172 | |
173 | TYPED_TEST(TestHadoopFileSystem, ConnectsAgain) { |
174 | SKIP_IF_NO_DRIVER(); |
175 | |
176 | std::shared_ptr<HadoopFileSystem> client; |
177 | ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client)); |
178 | ASSERT_OK(client->Disconnect()); |
179 | } |
180 | |
181 | TYPED_TEST(TestHadoopFileSystem, MultipleClients) { |
182 | SKIP_IF_NO_DRIVER(); |
183 | |
184 | ASSERT_OK(this->MakeScratchDir()); |
185 | |
186 | std::shared_ptr<HadoopFileSystem> client1; |
187 | std::shared_ptr<HadoopFileSystem> client2; |
188 | ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1)); |
189 | ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2)); |
190 | ASSERT_OK(client1->Disconnect()); |
191 | |
192 | // client2 continues to function after equivalent client1 has shutdown |
193 | std::vector<HdfsPathInfo> listing; |
194 | ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing)); |
195 | ASSERT_OK(client2->Disconnect()); |
196 | } |
197 | |
198 | TYPED_TEST(TestHadoopFileSystem, MakeDirectory) { |
199 | SKIP_IF_NO_DRIVER(); |
200 | |
201 | std::string path = this->ScratchPath("create-directory" ); |
202 | |
203 | if (this->client_->Exists(path)) { |
204 | ASSERT_OK(this->client_->Delete(path, true)); |
205 | } |
206 | |
207 | ASSERT_OK(this->client_->MakeDirectory(path)); |
208 | ASSERT_TRUE(this->client_->Exists(path)); |
209 | std::vector<HdfsPathInfo> listing; |
210 | EXPECT_OK(this->client_->ListDirectory(path, &listing)); |
211 | ASSERT_EQ(0, listing.size()); |
212 | EXPECT_OK(this->client_->Delete(path, true)); |
213 | ASSERT_FALSE(this->client_->Exists(path)); |
214 | ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing)); |
215 | } |
216 | |
217 | TYPED_TEST(TestHadoopFileSystem, GetCapacityUsed) { |
218 | SKIP_IF_NO_DRIVER(); |
219 | |
220 | // Who knows what is actually in your DFS cluster, but expect it to have |
221 | // positive used bytes and capacity |
222 | int64_t nbytes = 0; |
223 | ASSERT_OK(this->client_->GetCapacity(&nbytes)); |
224 | ASSERT_LT(0, nbytes); |
225 | |
226 | ASSERT_OK(this->client_->GetUsed(&nbytes)); |
227 | ASSERT_LT(0, nbytes); |
228 | } |
229 | |
230 | TYPED_TEST(TestHadoopFileSystem, GetPathInfo) { |
231 | SKIP_IF_NO_DRIVER(); |
232 | |
233 | HdfsPathInfo info; |
234 | |
235 | ASSERT_OK(this->MakeScratchDir()); |
236 | |
237 | // Directory info |
238 | ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info)); |
239 | ASSERT_EQ(ObjectType::DIRECTORY, info.kind); |
240 | ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name); |
241 | ASSERT_EQ(this->conf_.user, info.owner); |
242 | |
243 | // TODO(wesm): test group, other attrs |
244 | |
245 | auto path = this->ScratchPath("test-file" ); |
246 | |
247 | const int size = 100; |
248 | |
249 | std::vector<uint8_t> buffer = RandomData(size); |
250 | |
251 | ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size)); |
252 | ASSERT_OK(this->client_->GetPathInfo(path, &info)); |
253 | |
254 | ASSERT_EQ(ObjectType::FILE, info.kind); |
255 | ASSERT_EQ(this->HdfsAbsPath(path), info.name); |
256 | ASSERT_EQ(this->conf_.user, info.owner); |
257 | ASSERT_EQ(size, info.size); |
258 | } |
259 | |
260 | TYPED_TEST(TestHadoopFileSystem, GetPathInfoNotExist) { |
261 | // ARROW-2919: Test that the error message is reasonable |
262 | SKIP_IF_NO_DRIVER(); |
263 | |
264 | ASSERT_OK(this->MakeScratchDir()); |
265 | auto path = this->ScratchPath("path-does-not-exist" ); |
266 | |
267 | HdfsPathInfo info; |
268 | Status s = this->client_->GetPathInfo(path, &info); |
269 | ASSERT_TRUE(s.IsIOError()); |
270 | |
271 | const std::string error_message = s.ToString(); |
272 | |
273 | // Check that the file path is found in the error message |
274 | ASSERT_LT(error_message.find(path), std::string::npos); |
275 | } |
276 | |
277 | TYPED_TEST(TestHadoopFileSystem, AppendToFile) { |
278 | SKIP_IF_NO_DRIVER(); |
279 | |
280 | ASSERT_OK(this->MakeScratchDir()); |
281 | |
282 | auto path = this->ScratchPath("test-file" ); |
283 | const int size = 100; |
284 | |
285 | std::vector<uint8_t> buffer = RandomData(size); |
286 | ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size)); |
287 | |
288 | // now append |
289 | ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true)); |
290 | |
291 | HdfsPathInfo info; |
292 | ASSERT_OK(this->client_->GetPathInfo(path, &info)); |
293 | ASSERT_EQ(size * 2, info.size); |
294 | } |
295 | |
296 | TYPED_TEST(TestHadoopFileSystem, ListDirectory) { |
297 | SKIP_IF_NO_DRIVER(); |
298 | |
299 | const int size = 100; |
300 | std::vector<uint8_t> data = RandomData(size); |
301 | |
302 | auto p1 = this->ScratchPath("test-file-1" ); |
303 | auto p2 = this->ScratchPath("test-file-2" ); |
304 | auto d1 = this->ScratchPath("test-dir-1" ); |
305 | |
306 | ASSERT_OK(this->MakeScratchDir()); |
307 | ASSERT_OK(this->WriteDummyFile(p1, data.data(), size)); |
308 | ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2)); |
309 | ASSERT_OK(this->client_->MakeDirectory(d1)); |
310 | |
311 | std::vector<HdfsPathInfo> listing; |
312 | ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing)); |
313 | |
314 | // Do it again, appends! |
315 | ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing)); |
316 | |
317 | ASSERT_EQ(6, static_cast<int>(listing.size())); |
318 | |
319 | // Argh, well, shouldn't expect the listing to be in any particular order |
320 | for (size_t i = 0; i < listing.size(); ++i) { |
321 | const HdfsPathInfo& info = listing[i]; |
322 | if (info.name == this->HdfsAbsPath(p1)) { |
323 | ASSERT_EQ(ObjectType::FILE, info.kind); |
324 | ASSERT_EQ(size, info.size); |
325 | } else if (info.name == this->HdfsAbsPath(p2)) { |
326 | ASSERT_EQ(ObjectType::FILE, info.kind); |
327 | ASSERT_EQ(size / 2, info.size); |
328 | } else if (info.name == this->HdfsAbsPath(d1)) { |
329 | ASSERT_EQ(ObjectType::DIRECTORY, info.kind); |
330 | } else { |
331 | FAIL() << "Unexpected path: " << info.name; |
332 | } |
333 | } |
334 | } |
335 | |
336 | TYPED_TEST(TestHadoopFileSystem, ReadableMethods) { |
337 | SKIP_IF_NO_DRIVER(); |
338 | |
339 | ASSERT_OK(this->MakeScratchDir()); |
340 | |
341 | auto path = this->ScratchPath("test-file" ); |
342 | const int size = 100; |
343 | |
344 | std::vector<uint8_t> data = RandomData(size); |
345 | ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); |
346 | |
347 | std::shared_ptr<HdfsReadableFile> file; |
348 | ASSERT_OK(this->client_->OpenReadable(path, &file)); |
349 | |
350 | // Test GetSize -- move this into its own unit test if ever needed |
351 | int64_t file_size; |
352 | ASSERT_OK(file->GetSize(&file_size)); |
353 | ASSERT_EQ(size, file_size); |
354 | |
355 | uint8_t buffer[50]; |
356 | int64_t bytes_read = 0; |
357 | |
358 | ASSERT_OK(file->Read(50, &bytes_read, buffer)); |
359 | ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50)); |
360 | ASSERT_EQ(50, bytes_read); |
361 | |
362 | ASSERT_OK(file->Read(50, &bytes_read, buffer)); |
363 | ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50)); |
364 | ASSERT_EQ(50, bytes_read); |
365 | |
366 | // EOF |
367 | ASSERT_OK(file->Read(1, &bytes_read, buffer)); |
368 | ASSERT_EQ(0, bytes_read); |
369 | |
370 | // ReadAt to EOF |
371 | ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer)); |
372 | ASSERT_EQ(40, bytes_read); |
373 | ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read)); |
374 | |
375 | // Seek, Tell |
376 | ASSERT_OK(file->Seek(60)); |
377 | |
378 | int64_t position; |
379 | ASSERT_OK(file->Tell(&position)); |
380 | ASSERT_EQ(60, position); |
381 | } |
382 | |
383 | TYPED_TEST(TestHadoopFileSystem, LargeFile) { |
384 | SKIP_IF_NO_DRIVER(); |
385 | |
386 | ASSERT_OK(this->MakeScratchDir()); |
387 | |
388 | auto path = this->ScratchPath("test-large-file" ); |
389 | const int size = 1000000; |
390 | |
391 | std::vector<uint8_t> data = RandomData(size); |
392 | ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); |
393 | |
394 | std::shared_ptr<HdfsReadableFile> file; |
395 | ASSERT_OK(this->client_->OpenReadable(path, &file)); |
396 | |
397 | ASSERT_FALSE(file->closed()); |
398 | |
399 | std::shared_ptr<Buffer> buffer; |
400 | ASSERT_OK(AllocateBuffer(nullptr, size, &buffer)); |
401 | |
402 | int64_t bytes_read = 0; |
403 | |
404 | ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data())); |
405 | ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size)); |
406 | ASSERT_EQ(size, bytes_read); |
407 | |
408 | // explicit buffer size |
409 | std::shared_ptr<HdfsReadableFile> file2; |
410 | ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2)); |
411 | |
412 | std::shared_ptr<Buffer> buffer2; |
413 | ASSERT_OK(AllocateBuffer(nullptr, size, &buffer2)); |
414 | |
415 | ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data())); |
416 | ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size)); |
417 | ASSERT_EQ(size, bytes_read); |
418 | } |
419 | |
420 | TYPED_TEST(TestHadoopFileSystem, RenameFile) { |
421 | SKIP_IF_NO_DRIVER(); |
422 | ASSERT_OK(this->MakeScratchDir()); |
423 | |
424 | auto src_path = this->ScratchPath("src-file" ); |
425 | auto dst_path = this->ScratchPath("dst-file" ); |
426 | const int size = 100; |
427 | |
428 | std::vector<uint8_t> data = RandomData(size); |
429 | ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size)); |
430 | |
431 | ASSERT_OK(this->client_->Rename(src_path, dst_path)); |
432 | |
433 | ASSERT_FALSE(this->client_->Exists(src_path)); |
434 | ASSERT_TRUE(this->client_->Exists(dst_path)); |
435 | } |
436 | |
437 | TYPED_TEST(TestHadoopFileSystem, ChmodChown) { |
438 | SKIP_IF_NO_DRIVER(); |
439 | ASSERT_OK(this->MakeScratchDir()); |
440 | |
441 | auto path = this->ScratchPath("path-to-chmod" ); |
442 | |
443 | int16_t mode = 0755; |
444 | const int size = 100; |
445 | |
446 | std::vector<uint8_t> data = RandomData(size); |
447 | ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); |
448 | |
449 | HdfsPathInfo info; |
450 | ASSERT_OK(this->client_->Chmod(path, mode)); |
451 | ASSERT_OK(this->client_->GetPathInfo(path, &info)); |
452 | ASSERT_EQ(mode, info.permissions); |
453 | |
454 | std::string owner = "hadoop" ; |
455 | std::string group = "hadoop" ; |
456 | ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str())); |
457 | ASSERT_OK(this->client_->GetPathInfo(path, &info)); |
458 | ASSERT_EQ("hadoop" , info.owner); |
459 | ASSERT_EQ("hadoop" , info.group); |
460 | } |
461 | |
462 | TYPED_TEST(TestHadoopFileSystem, ThreadSafety) { |
463 | SKIP_IF_NO_DRIVER(); |
464 | ASSERT_OK(this->MakeScratchDir()); |
465 | |
466 | auto src_path = this->ScratchPath("threadsafety" ); |
467 | |
468 | std::string data = "foobar" ; |
469 | ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()), |
470 | static_cast<int64_t>(data.size()))); |
471 | |
472 | std::shared_ptr<HdfsReadableFile> file; |
473 | ASSERT_OK(this->client_->OpenReadable(src_path, &file)); |
474 | |
475 | std::atomic<int> correct_count(0); |
476 | int niter = 1000; |
477 | |
478 | auto ReadData = [&file, &correct_count, &data, &niter]() { |
479 | for (int i = 0; i < niter; ++i) { |
480 | std::shared_ptr<Buffer> buffer; |
481 | if (i % 2 == 0) { |
482 | ASSERT_OK(file->ReadAt(3, 3, &buffer)); |
483 | if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) { |
484 | correct_count += 1; |
485 | } |
486 | } else { |
487 | ASSERT_OK(file->ReadAt(0, 4, &buffer)); |
488 | if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) { |
489 | correct_count += 1; |
490 | } |
491 | } |
492 | } |
493 | }; |
494 | |
495 | std::thread thread1(ReadData); |
496 | std::thread thread2(ReadData); |
497 | std::thread thread3(ReadData); |
498 | std::thread thread4(ReadData); |
499 | |
500 | thread1.join(); |
501 | thread2.join(); |
502 | thread3.join(); |
503 | thread4.join(); |
504 | |
505 | ASSERT_EQ(niter * 4, correct_count); |
506 | } |
507 | |
508 | } // namespace io |
509 | } // namespace arrow |
510 | |