1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <hdfs.h>
19
20#include <errno.h>
21#include <algorithm>
22#include <cerrno>
23#include <cstdint>
24#include <cstring>
25#include <memory>
26#include <mutex>
27#include <sstream>
28#include <string>
29#include <unordered_map>
30#include <utility>
31#include <vector>
32
33#include "arrow/buffer.h"
34#include "arrow/io/hdfs-internal.h"
35#include "arrow/io/hdfs.h"
36#include "arrow/io/interfaces.h"
37#include "arrow/memory_pool.h"
38#include "arrow/status.h"
39#include "arrow/util/logging.h"
40
41using std::size_t;
42
43namespace arrow {
44namespace io {
45
46namespace {
47
48std::string TranslateErrno(int error_code) {
49 std::stringstream ss;
50 ss << error_code << " (" << strerror(error_code) << ")";
51 if (error_code == 255) {
52 // Unknown error can occur if the host is correct but the port is not
53 ss << " Please check that you are connecting to the correct HDFS RPC port";
54 }
55 return ss.str();
56}
57
58} // namespace
59
60#define CHECK_FAILURE(RETURN_VALUE, WHAT) \
61 do { \
62 if (RETURN_VALUE == -1) { \
63 return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \
64 } \
65 } while (0)
66
67static constexpr int kDefaultHdfsBufferSize = 1 << 16;
68
69// ----------------------------------------------------------------------
70// File reading
71
72class HdfsAnyFileImpl {
73 public:
74 void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs,
75 hdfsFile handle) {
76 path_ = path;
77 driver_ = driver;
78 fs_ = fs;
79 file_ = handle;
80 is_open_ = true;
81 }
82
83 Status Seek(int64_t position) {
84 int ret = driver_->Seek(fs_, file_, position);
85 CHECK_FAILURE(ret, "seek");
86 return Status::OK();
87 }
88
89 Status Tell(int64_t* offset) {
90 int64_t ret = driver_->Tell(fs_, file_);
91 CHECK_FAILURE(ret, "tell");
92 *offset = ret;
93 return Status::OK();
94 }
95
96 bool is_open() const { return is_open_; }
97
98 protected:
99 std::string path_;
100
101 internal::LibHdfsShim* driver_;
102
103 // For threadsafety
104 std::mutex lock_;
105
106 // These are pointers in libhdfs, so OK to copy
107 hdfsFS fs_;
108 hdfsFile file_;
109
110 bool is_open_;
111};
112
113namespace {
114
115Status GetPathInfoFailed(const std::string& path) {
116 std::stringstream ss;
117 ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno);
118 return Status::IOError(ss.str());
119}
120
121} // namespace
122
123// Private implementation for read-only files
124class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
125 public:
126 explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {}
127
128 Status Close() {
129 if (is_open_) {
130 int ret = driver_->CloseFile(fs_, file_);
131 CHECK_FAILURE(ret, "CloseFile");
132 is_open_ = false;
133 }
134 return Status::OK();
135 }
136
137 bool closed() const { return !is_open_; }
138
139 Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* buffer) {
140 tSize ret;
141 if (driver_->HasPread()) {
142 ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position),
143 reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes));
144 } else {
145 std::lock_guard<std::mutex> guard(lock_);
146 RETURN_NOT_OK(Seek(position));
147 return Read(nbytes, bytes_read, buffer);
148 }
149 CHECK_FAILURE(ret, "read");
150 *bytes_read = ret;
151 return Status::OK();
152 }
153
154 Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
155 std::shared_ptr<ResizableBuffer> buffer;
156 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
157
158 int64_t bytes_read = 0;
159 RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
160
161 if (bytes_read < nbytes) {
162 RETURN_NOT_OK(buffer->Resize(bytes_read));
163 buffer->ZeroPadding();
164 }
165
166 *out = buffer;
167 return Status::OK();
168 }
169
170 Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
171 int64_t total_bytes = 0;
172 while (total_bytes < nbytes) {
173 tSize ret = driver_->Read(
174 fs_, file_, reinterpret_cast<uint8_t*>(buffer) + total_bytes,
175 static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes)));
176 CHECK_FAILURE(ret, "read");
177 total_bytes += ret;
178 if (ret == 0) {
179 break;
180 }
181 }
182
183 *bytes_read = total_bytes;
184 return Status::OK();
185 }
186
187 Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
188 std::shared_ptr<ResizableBuffer> buffer;
189 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
190
191 int64_t bytes_read = 0;
192 RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
193 if (bytes_read < nbytes) {
194 RETURN_NOT_OK(buffer->Resize(bytes_read));
195 }
196
197 *out = buffer;
198 return Status::OK();
199 }
200
201 Status GetSize(int64_t* size) {
202 hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str());
203 if (entry == nullptr) {
204 return GetPathInfoFailed(path_);
205 }
206
207 *size = entry->mSize;
208 driver_->FreeFileInfo(entry, 1);
209 return Status::OK();
210 }
211
212 void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
213
214 void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; }
215
216 private:
217 MemoryPool* pool_;
218 int32_t buffer_size_;
219};
220
221HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) {
222 if (pool == nullptr) {
223 pool = default_memory_pool();
224 }
225 impl_.reset(new HdfsReadableFileImpl(pool));
226}
227
228HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); }
229
230Status HdfsReadableFile::Close() { return impl_->Close(); }
231
232bool HdfsReadableFile::closed() const { return impl_->closed(); }
233
234Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
235 void* buffer) {
236 return impl_->ReadAt(position, nbytes, bytes_read, buffer);
237}
238
239Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes,
240 std::shared_ptr<Buffer>* out) {
241 return impl_->ReadAt(position, nbytes, out);
242}
243
244Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
245 return impl_->Read(nbytes, bytes_read, buffer);
246}
247
248Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* buffer) {
249 return impl_->Read(nbytes, buffer);
250}
251
252Status HdfsReadableFile::GetSize(int64_t* size) { return impl_->GetSize(size); }
253
254Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); }
255
256Status HdfsReadableFile::Tell(int64_t* position) const { return impl_->Tell(position); }
257
258// ----------------------------------------------------------------------
259// File writing
260
261// Private implementation for writable-only files
262class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
263 public:
264 HdfsOutputStreamImpl() {}
265
266 Status Close() {
267 if (is_open_) {
268 RETURN_NOT_OK(Flush());
269 int ret = driver_->CloseFile(fs_, file_);
270 CHECK_FAILURE(ret, "CloseFile");
271 is_open_ = false;
272 }
273 return Status::OK();
274 }
275
276 bool closed() const { return !is_open_; }
277
278 Status Flush() {
279 int ret = driver_->Flush(fs_, file_);
280 CHECK_FAILURE(ret, "Flush");
281 return Status::OK();
282 }
283
284 Status Write(const void* buffer, int64_t nbytes, int64_t* bytes_written) {
285 std::lock_guard<std::mutex> guard(lock_);
286 tSize ret = driver_->Write(fs_, file_, reinterpret_cast<const void*>(buffer),
287 static_cast<tSize>(nbytes));
288 CHECK_FAILURE(ret, "Write");
289 *bytes_written = ret;
290 return Status::OK();
291 }
292};
293
294HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); }
295
296HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_->Close()); }
297
298Status HdfsOutputStream::Close() { return impl_->Close(); }
299
300bool HdfsOutputStream::closed() const { return impl_->closed(); }
301
302Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes, int64_t* bytes_read) {
303 return impl_->Write(buffer, nbytes, bytes_read);
304}
305
306Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) {
307 int64_t bytes_written_dummy = 0;
308 return Write(buffer, nbytes, &bytes_written_dummy);
309}
310
311Status HdfsOutputStream::Flush() { return impl_->Flush(); }
312
313Status HdfsOutputStream::Tell(int64_t* position) const { return impl_->Tell(position); }
314
315// ----------------------------------------------------------------------
316// HDFS client
317
318// TODO(wesm): this could throw std::bad_alloc in the course of copying strings
319// into the path info object
320static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) {
321 out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY;
322 out->name = std::string(input->mName);
323 out->owner = std::string(input->mOwner);
324 out->group = std::string(input->mGroup);
325
326 out->last_access_time = static_cast<int32_t>(input->mLastAccess);
327 out->last_modified_time = static_cast<int32_t>(input->mLastMod);
328 out->size = static_cast<int64_t>(input->mSize);
329
330 out->replication = input->mReplication;
331 out->block_size = input->mBlockSize;
332
333 out->permissions = input->mPermissions;
334}
335
336// Private implementation
337class HadoopFileSystem::HadoopFileSystemImpl {
338 public:
339 HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {}
340
341 Status Connect(const HdfsConnectionConfig* config) {
342 if (config->driver == HdfsDriver::LIBHDFS3) {
343 RETURN_NOT_OK(ConnectLibHdfs3(&driver_));
344 } else {
345 RETURN_NOT_OK(ConnectLibHdfs(&driver_));
346 }
347
348 // connect to HDFS with the builder object
349 hdfsBuilder* builder = driver_->NewBuilder();
350 if (!config->host.empty()) {
351 driver_->BuilderSetNameNode(builder, config->host.c_str());
352 }
353 driver_->BuilderSetNameNodePort(builder, static_cast<tPort>(config->port));
354 if (!config->user.empty()) {
355 driver_->BuilderSetUserName(builder, config->user.c_str());
356 }
357 if (!config->kerb_ticket.empty()) {
358 driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
359 }
360
361 for (auto& kv : config->extra_conf) {
362 int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str());
363 CHECK_FAILURE(ret, "confsetstr");
364 }
365
366 driver_->BuilderSetForceNewInstance(builder);
367 fs_ = driver_->BuilderConnect(builder);
368
369 if (fs_ == nullptr) {
370 return Status::IOError("HDFS connection failed");
371 }
372 namenode_host_ = config->host;
373 port_ = config->port;
374 user_ = config->user;
375 kerb_ticket_ = config->kerb_ticket;
376
377 return Status::OK();
378 }
379
380 Status MakeDirectory(const std::string& path) {
381 int ret = driver_->MakeDirectory(fs_, path.c_str());
382 CHECK_FAILURE(ret, "create directory");
383 return Status::OK();
384 }
385
386 Status Delete(const std::string& path, bool recursive) {
387 int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive));
388 CHECK_FAILURE(ret, "delete");
389 return Status::OK();
390 }
391
392 Status Disconnect() {
393 int ret = driver_->Disconnect(fs_);
394 CHECK_FAILURE(ret, "hdfsFS::Disconnect");
395 return Status::OK();
396 }
397
398 bool Exists(const std::string& path) {
399 // hdfsExists does not distinguish between RPC failure and the file not
400 // existing
401 int ret = driver_->Exists(fs_, path.c_str());
402 return ret == 0;
403 }
404
405 Status GetCapacity(int64_t* nbytes) {
406 tOffset ret = driver_->GetCapacity(fs_);
407 CHECK_FAILURE(ret, "GetCapacity");
408 *nbytes = ret;
409 return Status::OK();
410 }
411
412 Status GetUsed(int64_t* nbytes) {
413 tOffset ret = driver_->GetUsed(fs_);
414 CHECK_FAILURE(ret, "GetUsed");
415 *nbytes = ret;
416 return Status::OK();
417 }
418
419 Status GetPathInfo(const std::string& path, HdfsPathInfo* info) {
420 hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str());
421
422 if (entry == nullptr) {
423 return GetPathInfoFailed(path);
424 }
425
426 SetPathInfo(entry, info);
427 driver_->FreeFileInfo(entry, 1);
428
429 return Status::OK();
430 }
431
432 Status Stat(const std::string& path, FileStatistics* stat) {
433 HdfsPathInfo info;
434 RETURN_NOT_OK(GetPathInfo(path, &info));
435
436 stat->size = info.size;
437 stat->kind = info.kind;
438 return Status::OK();
439 }
440
441 Status GetChildren(const std::string& path, std::vector<std::string>* listing) {
442 std::vector<HdfsPathInfo> detailed_listing;
443 RETURN_NOT_OK(ListDirectory(path, &detailed_listing));
444 for (const auto& info : detailed_listing) {
445 listing->push_back(info.name);
446 }
447 return Status::OK();
448 }
449
450 Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) {
451 int num_entries = 0;
452 errno = 0;
453 hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries);
454
455 if (entries == nullptr) {
456 // If the directory is empty, entries is NULL but errno is 0. Non-zero
457 // errno indicates error
458 //
459 // Note: errno is thread-local
460 //
461 // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set
462 // errno 2/ENOENT for empty directories. To be more robust to this we
463 // double check this case
464 if ((errno == 0) || (errno == ENOENT && Exists(path))) {
465 num_entries = 0;
466 } else {
467 return Status::IOError("HDFS list directory failed, errno: ",
468 TranslateErrno(errno));
469 }
470 }
471
472 // Allocate additional space for elements
473 int vec_offset = static_cast<int>(listing->size());
474 listing->resize(vec_offset + num_entries);
475
476 for (int i = 0; i < num_entries; ++i) {
477 SetPathInfo(entries + i, &(*listing)[vec_offset + i]);
478 }
479
480 // Free libhdfs file info
481 driver_->FreeFileInfo(entries, num_entries);
482
483 return Status::OK();
484 }
485
486 Status OpenReadable(const std::string& path, int32_t buffer_size,
487 std::shared_ptr<HdfsReadableFile>* file) {
488 hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0);
489
490 if (handle == nullptr) {
491 const char* msg = !Exists(path) ? "HDFS file does not exist: "
492 : "HDFS path exists, but opening file failed: ";
493 return Status::IOError(msg, path);
494 }
495
496 // std::make_shared does not work with private ctors
497 *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
498 (*file)->impl_->set_members(path, driver_, fs_, handle);
499 (*file)->impl_->set_buffer_size(buffer_size);
500
501 return Status::OK();
502 }
503
504 Status OpenWritable(const std::string& path, bool append, int32_t buffer_size,
505 int16_t replication, int64_t default_block_size,
506 std::shared_ptr<HdfsOutputStream>* file) {
507 int flags = O_WRONLY;
508 if (append) flags |= O_APPEND;
509
510 hdfsFile handle =
511 driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication,
512 static_cast<tSize>(default_block_size));
513
514 if (handle == nullptr) {
515 return Status::IOError("Unable to open file ", path);
516 }
517
518 // std::make_shared does not work with private ctors
519 *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
520 (*file)->impl_->set_members(path, driver_, fs_, handle);
521
522 return Status::OK();
523 }
524
525 Status Rename(const std::string& src, const std::string& dst) {
526 int ret = driver_->Rename(fs_, src.c_str(), dst.c_str());
527 CHECK_FAILURE(ret, "Rename");
528 return Status::OK();
529 }
530
531 Status Chmod(const std::string& path, int mode) {
532 int ret = driver_->Chmod(fs_, path.c_str(), static_cast<short>(mode)); // NOLINT
533 CHECK_FAILURE(ret, "Chmod");
534 return Status::OK();
535 }
536
537 Status Chown(const std::string& path, const char* owner, const char* group) {
538 int ret = driver_->Chown(fs_, path.c_str(), owner, group);
539 CHECK_FAILURE(ret, "Chown");
540 return Status::OK();
541 }
542
543 private:
544 internal::LibHdfsShim* driver_;
545
546 std::string namenode_host_;
547 std::string user_;
548 int port_;
549 std::string kerb_ticket_;
550
551 hdfsFS fs_;
552};
553
554// ----------------------------------------------------------------------
555// Public API for HDFSClient
556
557HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); }
558
559HadoopFileSystem::~HadoopFileSystem() {}
560
561Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config,
562 std::shared_ptr<HadoopFileSystem>* fs) {
563 // ctor is private, make_shared will not work
564 *fs = std::shared_ptr<HadoopFileSystem>(new HadoopFileSystem());
565
566 RETURN_NOT_OK((*fs)->impl_->Connect(config));
567 return Status::OK();
568}
569
570Status HadoopFileSystem::MakeDirectory(const std::string& path) {
571 return impl_->MakeDirectory(path);
572}
573
574Status HadoopFileSystem::Delete(const std::string& path, bool recursive) {
575 return impl_->Delete(path, recursive);
576}
577
578Status HadoopFileSystem::DeleteDirectory(const std::string& path) {
579 return Delete(path, true);
580}
581
582Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); }
583
584bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); }
585
586Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) {
587 return impl_->GetPathInfo(path, info);
588}
589
590Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) {
591 return impl_->Stat(path, stat);
592}
593
594Status HadoopFileSystem::GetCapacity(int64_t* nbytes) {
595 return impl_->GetCapacity(nbytes);
596}
597
598Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); }
599
600Status HadoopFileSystem::GetChildren(const std::string& path,
601 std::vector<std::string>* listing) {
602 return impl_->GetChildren(path, listing);
603}
604
605Status HadoopFileSystem::ListDirectory(const std::string& path,
606 std::vector<HdfsPathInfo>* listing) {
607 return impl_->ListDirectory(path, listing);
608}
609
610Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size,
611 std::shared_ptr<HdfsReadableFile>* file) {
612 return impl_->OpenReadable(path, buffer_size, file);
613}
614
615Status HadoopFileSystem::OpenReadable(const std::string& path,
616 std::shared_ptr<HdfsReadableFile>* file) {
617 return OpenReadable(path, kDefaultHdfsBufferSize, file);
618}
619
620Status HadoopFileSystem::OpenWritable(const std::string& path, bool append,
621 int32_t buffer_size, int16_t replication,
622 int64_t default_block_size,
623 std::shared_ptr<HdfsOutputStream>* file) {
624 return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size,
625 file);
626}
627
628Status HadoopFileSystem::OpenWritable(const std::string& path, bool append,
629 std::shared_ptr<HdfsOutputStream>* file) {
630 return OpenWritable(path, append, 0, 0, 0, file);
631}
632
633Status HadoopFileSystem::Chmod(const std::string& path, int mode) {
634 return impl_->Chmod(path, mode);
635}
636
637Status HadoopFileSystem::Chown(const std::string& path, const char* owner,
638 const char* group) {
639 return impl_->Chown(path, owner, group);
640}
641
642Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) {
643 return impl_->Rename(src, dst);
644}
645
646// Deprecated in 0.11
647
648Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append,
649 int32_t buffer_size, int16_t replication,
650 int64_t default_block_size,
651 std::shared_ptr<HdfsOutputStream>* file) {
652 return OpenWritable(path, append, buffer_size, replication, default_block_size, file);
653}
654
655Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append,
656 std::shared_ptr<HdfsOutputStream>* file) {
657 return OpenWritable(path, append, 0, 0, 0, file);
658}
659
660// ----------------------------------------------------------------------
661// Allow public API users to check whether we are set up correctly
662
663Status HaveLibHdfs() {
664 internal::LibHdfsShim* driver;
665 return internal::ConnectLibHdfs(&driver);
666}
667
668Status HaveLibHdfs3() {
669 internal::LibHdfsShim* driver;
670 return internal::ConnectLibHdfs3(&driver);
671}
672
673} // namespace io
674} // namespace arrow
675