1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <atomic>
19#include <cstdint>
20#include <cstdlib>
21#include <cstring>
22#include <iostream>
23#include <memory>
24#include <sstream> // IWYU pragma: keep
25#include <string>
26#include <thread>
27#include <vector>
28
29#include <gtest/gtest.h>
30
31#include <boost/filesystem.hpp> // NOLINT
32
33#include "arrow/buffer.h"
34#include "arrow/io/hdfs-internal.h"
35#include "arrow/io/hdfs.h"
36#include "arrow/io/interfaces.h"
37#include "arrow/status.h"
38#include "arrow/test-util.h"
39
40namespace arrow {
41namespace io {
42
43std::vector<uint8_t> RandomData(int64_t size) {
44 std::vector<uint8_t> buffer(size);
45 random_bytes(size, 0, buffer.data());
46 return buffer;
47}
48
49struct JNIDriver {
50 static HdfsDriver type;
51};
52
53struct PivotalDriver {
54 static HdfsDriver type;
55};
56
57template <typename DRIVER>
58class TestHadoopFileSystem : public ::testing::Test {
59 public:
60 Status MakeScratchDir() {
61 if (client_->Exists(scratch_dir_)) {
62 RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
63 }
64 return client_->MakeDirectory(scratch_dir_);
65 }
66
67 Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
68 bool append = false, int buffer_size = 0, int16_t replication = 0,
69 int default_block_size = 0) {
70 std::shared_ptr<HdfsOutputStream> file;
71 RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication,
72 default_block_size, &file));
73
74 RETURN_NOT_OK(file->Write(buffer, size));
75 RETURN_NOT_OK(file->Close());
76
77 return Status::OK();
78 }
79
80 std::string ScratchPath(const std::string& name) {
81 std::stringstream ss;
82 ss << scratch_dir_ << "/" << name;
83 return ss.str();
84 }
85
86 std::string HdfsAbsPath(const std::string& relpath) {
87 std::stringstream ss;
88 ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
89 return ss.str();
90 }
91
92 // Set up shared state between unit tests
93 void SetUp() {
94 internal::LibHdfsShim* driver_shim;
95
96 client_ = nullptr;
97 scratch_dir_ =
98 boost::filesystem::unique_path(boost::filesystem::temp_directory_path() /
99 "arrow-hdfs/scratch-%%%%")
100 .string();
101
102 loaded_driver_ = false;
103
104 Status msg;
105
106 if (DRIVER::type == HdfsDriver::LIBHDFS) {
107 msg = ConnectLibHdfs(&driver_shim);
108 if (!msg.ok()) {
109 std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl;
110 return;
111 }
112 } else {
113 msg = ConnectLibHdfs3(&driver_shim);
114 if (!msg.ok()) {
115 std::cout << "Loading libhdfs3 failed, skipping tests gracefully. "
116 << msg.ToString() << std::endl;
117 return;
118 }
119 }
120
121 loaded_driver_ = true;
122
123 const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
124 const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
125 const char* user = std::getenv("ARROW_HDFS_TEST_USER");
126
127 ASSERT_TRUE(user != nullptr) << "Set ARROW_HDFS_TEST_USER";
128
129 conf_.host = host == nullptr ? "localhost" : host;
130 conf_.user = user;
131 conf_.port = port == nullptr ? 20500 : atoi(port);
132 conf_.driver = DRIVER::type;
133
134 ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_));
135 }
136
137 void TearDown() {
138 if (client_) {
139 if (client_->Exists(scratch_dir_)) {
140 EXPECT_OK(client_->Delete(scratch_dir_, true));
141 }
142 EXPECT_OK(client_->Disconnect());
143 }
144 }
145
146 HdfsConnectionConfig conf_;
147 bool loaded_driver_;
148
149 // Resources shared amongst unit tests
150 std::string scratch_dir_;
151 std::shared_ptr<HadoopFileSystem> client_;
152};
153
154template <>
155std::string TestHadoopFileSystem<PivotalDriver>::HdfsAbsPath(const std::string& relpath) {
156 std::stringstream ss;
157 ss << relpath;
158 return ss.str();
159}
160
161#define SKIP_IF_NO_DRIVER() \
162 if (!this->loaded_driver_) { \
163 std::cout << "Driver not loaded, skipping" << std::endl; \
164 return; \
165 }
166
167HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS;
168HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3;
169
170typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes;
171TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes);
172
173TYPED_TEST(TestHadoopFileSystem, ConnectsAgain) {
174 SKIP_IF_NO_DRIVER();
175
176 std::shared_ptr<HadoopFileSystem> client;
177 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client));
178 ASSERT_OK(client->Disconnect());
179}
180
181TYPED_TEST(TestHadoopFileSystem, MultipleClients) {
182 SKIP_IF_NO_DRIVER();
183
184 ASSERT_OK(this->MakeScratchDir());
185
186 std::shared_ptr<HadoopFileSystem> client1;
187 std::shared_ptr<HadoopFileSystem> client2;
188 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1));
189 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2));
190 ASSERT_OK(client1->Disconnect());
191
192 // client2 continues to function after equivalent client1 has shutdown
193 std::vector<HdfsPathInfo> listing;
194 ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing));
195 ASSERT_OK(client2->Disconnect());
196}
197
198TYPED_TEST(TestHadoopFileSystem, MakeDirectory) {
199 SKIP_IF_NO_DRIVER();
200
201 std::string path = this->ScratchPath("create-directory");
202
203 if (this->client_->Exists(path)) {
204 ASSERT_OK(this->client_->Delete(path, true));
205 }
206
207 ASSERT_OK(this->client_->MakeDirectory(path));
208 ASSERT_TRUE(this->client_->Exists(path));
209 std::vector<HdfsPathInfo> listing;
210 EXPECT_OK(this->client_->ListDirectory(path, &listing));
211 ASSERT_EQ(0, listing.size());
212 EXPECT_OK(this->client_->Delete(path, true));
213 ASSERT_FALSE(this->client_->Exists(path));
214 ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing));
215}
216
217TYPED_TEST(TestHadoopFileSystem, GetCapacityUsed) {
218 SKIP_IF_NO_DRIVER();
219
220 // Who knows what is actually in your DFS cluster, but expect it to have
221 // positive used bytes and capacity
222 int64_t nbytes = 0;
223 ASSERT_OK(this->client_->GetCapacity(&nbytes));
224 ASSERT_LT(0, nbytes);
225
226 ASSERT_OK(this->client_->GetUsed(&nbytes));
227 ASSERT_LT(0, nbytes);
228}
229
230TYPED_TEST(TestHadoopFileSystem, GetPathInfo) {
231 SKIP_IF_NO_DRIVER();
232
233 HdfsPathInfo info;
234
235 ASSERT_OK(this->MakeScratchDir());
236
237 // Directory info
238 ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info));
239 ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
240 ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name);
241 ASSERT_EQ(this->conf_.user, info.owner);
242
243 // TODO(wesm): test group, other attrs
244
245 auto path = this->ScratchPath("test-file");
246
247 const int size = 100;
248
249 std::vector<uint8_t> buffer = RandomData(size);
250
251 ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
252 ASSERT_OK(this->client_->GetPathInfo(path, &info));
253
254 ASSERT_EQ(ObjectType::FILE, info.kind);
255 ASSERT_EQ(this->HdfsAbsPath(path), info.name);
256 ASSERT_EQ(this->conf_.user, info.owner);
257 ASSERT_EQ(size, info.size);
258}
259
260TYPED_TEST(TestHadoopFileSystem, GetPathInfoNotExist) {
261 // ARROW-2919: Test that the error message is reasonable
262 SKIP_IF_NO_DRIVER();
263
264 ASSERT_OK(this->MakeScratchDir());
265 auto path = this->ScratchPath("path-does-not-exist");
266
267 HdfsPathInfo info;
268 Status s = this->client_->GetPathInfo(path, &info);
269 ASSERT_TRUE(s.IsIOError());
270
271 const std::string error_message = s.ToString();
272
273 // Check that the file path is found in the error message
274 ASSERT_LT(error_message.find(path), std::string::npos);
275}
276
277TYPED_TEST(TestHadoopFileSystem, AppendToFile) {
278 SKIP_IF_NO_DRIVER();
279
280 ASSERT_OK(this->MakeScratchDir());
281
282 auto path = this->ScratchPath("test-file");
283 const int size = 100;
284
285 std::vector<uint8_t> buffer = RandomData(size);
286 ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
287
288 // now append
289 ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true));
290
291 HdfsPathInfo info;
292 ASSERT_OK(this->client_->GetPathInfo(path, &info));
293 ASSERT_EQ(size * 2, info.size);
294}
295
296TYPED_TEST(TestHadoopFileSystem, ListDirectory) {
297 SKIP_IF_NO_DRIVER();
298
299 const int size = 100;
300 std::vector<uint8_t> data = RandomData(size);
301
302 auto p1 = this->ScratchPath("test-file-1");
303 auto p2 = this->ScratchPath("test-file-2");
304 auto d1 = this->ScratchPath("test-dir-1");
305
306 ASSERT_OK(this->MakeScratchDir());
307 ASSERT_OK(this->WriteDummyFile(p1, data.data(), size));
308 ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2));
309 ASSERT_OK(this->client_->MakeDirectory(d1));
310
311 std::vector<HdfsPathInfo> listing;
312 ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
313
314 // Do it again, appends!
315 ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
316
317 ASSERT_EQ(6, static_cast<int>(listing.size()));
318
319 // Argh, well, shouldn't expect the listing to be in any particular order
320 for (size_t i = 0; i < listing.size(); ++i) {
321 const HdfsPathInfo& info = listing[i];
322 if (info.name == this->HdfsAbsPath(p1)) {
323 ASSERT_EQ(ObjectType::FILE, info.kind);
324 ASSERT_EQ(size, info.size);
325 } else if (info.name == this->HdfsAbsPath(p2)) {
326 ASSERT_EQ(ObjectType::FILE, info.kind);
327 ASSERT_EQ(size / 2, info.size);
328 } else if (info.name == this->HdfsAbsPath(d1)) {
329 ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
330 } else {
331 FAIL() << "Unexpected path: " << info.name;
332 }
333 }
334}
335
336TYPED_TEST(TestHadoopFileSystem, ReadableMethods) {
337 SKIP_IF_NO_DRIVER();
338
339 ASSERT_OK(this->MakeScratchDir());
340
341 auto path = this->ScratchPath("test-file");
342 const int size = 100;
343
344 std::vector<uint8_t> data = RandomData(size);
345 ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
346
347 std::shared_ptr<HdfsReadableFile> file;
348 ASSERT_OK(this->client_->OpenReadable(path, &file));
349
350 // Test GetSize -- move this into its own unit test if ever needed
351 int64_t file_size;
352 ASSERT_OK(file->GetSize(&file_size));
353 ASSERT_EQ(size, file_size);
354
355 uint8_t buffer[50];
356 int64_t bytes_read = 0;
357
358 ASSERT_OK(file->Read(50, &bytes_read, buffer));
359 ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
360 ASSERT_EQ(50, bytes_read);
361
362 ASSERT_OK(file->Read(50, &bytes_read, buffer));
363 ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
364 ASSERT_EQ(50, bytes_read);
365
366 // EOF
367 ASSERT_OK(file->Read(1, &bytes_read, buffer));
368 ASSERT_EQ(0, bytes_read);
369
370 // ReadAt to EOF
371 ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
372 ASSERT_EQ(40, bytes_read);
373 ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
374
375 // Seek, Tell
376 ASSERT_OK(file->Seek(60));
377
378 int64_t position;
379 ASSERT_OK(file->Tell(&position));
380 ASSERT_EQ(60, position);
381}
382
383TYPED_TEST(TestHadoopFileSystem, LargeFile) {
384 SKIP_IF_NO_DRIVER();
385
386 ASSERT_OK(this->MakeScratchDir());
387
388 auto path = this->ScratchPath("test-large-file");
389 const int size = 1000000;
390
391 std::vector<uint8_t> data = RandomData(size);
392 ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
393
394 std::shared_ptr<HdfsReadableFile> file;
395 ASSERT_OK(this->client_->OpenReadable(path, &file));
396
397 ASSERT_FALSE(file->closed());
398
399 std::shared_ptr<Buffer> buffer;
400 ASSERT_OK(AllocateBuffer(nullptr, size, &buffer));
401
402 int64_t bytes_read = 0;
403
404 ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data()));
405 ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
406 ASSERT_EQ(size, bytes_read);
407
408 // explicit buffer size
409 std::shared_ptr<HdfsReadableFile> file2;
410 ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
411
412 std::shared_ptr<Buffer> buffer2;
413 ASSERT_OK(AllocateBuffer(nullptr, size, &buffer2));
414
415 ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data()));
416 ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
417 ASSERT_EQ(size, bytes_read);
418}
419
420TYPED_TEST(TestHadoopFileSystem, RenameFile) {
421 SKIP_IF_NO_DRIVER();
422 ASSERT_OK(this->MakeScratchDir());
423
424 auto src_path = this->ScratchPath("src-file");
425 auto dst_path = this->ScratchPath("dst-file");
426 const int size = 100;
427
428 std::vector<uint8_t> data = RandomData(size);
429 ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size));
430
431 ASSERT_OK(this->client_->Rename(src_path, dst_path));
432
433 ASSERT_FALSE(this->client_->Exists(src_path));
434 ASSERT_TRUE(this->client_->Exists(dst_path));
435}
436
437TYPED_TEST(TestHadoopFileSystem, ChmodChown) {
438 SKIP_IF_NO_DRIVER();
439 ASSERT_OK(this->MakeScratchDir());
440
441 auto path = this->ScratchPath("path-to-chmod");
442
443 int16_t mode = 0755;
444 const int size = 100;
445
446 std::vector<uint8_t> data = RandomData(size);
447 ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
448
449 HdfsPathInfo info;
450 ASSERT_OK(this->client_->Chmod(path, mode));
451 ASSERT_OK(this->client_->GetPathInfo(path, &info));
452 ASSERT_EQ(mode, info.permissions);
453
454 std::string owner = "hadoop";
455 std::string group = "hadoop";
456 ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str()));
457 ASSERT_OK(this->client_->GetPathInfo(path, &info));
458 ASSERT_EQ("hadoop", info.owner);
459 ASSERT_EQ("hadoop", info.group);
460}
461
462TYPED_TEST(TestHadoopFileSystem, ThreadSafety) {
463 SKIP_IF_NO_DRIVER();
464 ASSERT_OK(this->MakeScratchDir());
465
466 auto src_path = this->ScratchPath("threadsafety");
467
468 std::string data = "foobar";
469 ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()),
470 static_cast<int64_t>(data.size())));
471
472 std::shared_ptr<HdfsReadableFile> file;
473 ASSERT_OK(this->client_->OpenReadable(src_path, &file));
474
475 std::atomic<int> correct_count(0);
476 int niter = 1000;
477
478 auto ReadData = [&file, &correct_count, &data, &niter]() {
479 for (int i = 0; i < niter; ++i) {
480 std::shared_ptr<Buffer> buffer;
481 if (i % 2 == 0) {
482 ASSERT_OK(file->ReadAt(3, 3, &buffer));
483 if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) {
484 correct_count += 1;
485 }
486 } else {
487 ASSERT_OK(file->ReadAt(0, 4, &buffer));
488 if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) {
489 correct_count += 1;
490 }
491 }
492 }
493 };
494
495 std::thread thread1(ReadData);
496 std::thread thread2(ReadData);
497 std::thread thread3(ReadData);
498 std::thread thread4(ReadData);
499
500 thread1.join();
501 thread2.join();
502 thread3.join();
503 thread4.join();
504
505 ASSERT_EQ(niter * 4, correct_count);
506}
507
508} // namespace io
509} // namespace arrow
510