1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <hdfs.h> |
19 | |
20 | #include <errno.h> |
21 | #include <algorithm> |
22 | #include <cerrno> |
23 | #include <cstdint> |
24 | #include <cstring> |
25 | #include <memory> |
26 | #include <mutex> |
27 | #include <sstream> |
28 | #include <string> |
29 | #include <unordered_map> |
30 | #include <utility> |
31 | #include <vector> |
32 | |
33 | #include "arrow/buffer.h" |
34 | #include "arrow/io/hdfs-internal.h" |
35 | #include "arrow/io/hdfs.h" |
36 | #include "arrow/io/interfaces.h" |
37 | #include "arrow/memory_pool.h" |
38 | #include "arrow/status.h" |
39 | #include "arrow/util/logging.h" |
40 | |
41 | using std::size_t; |
42 | |
43 | namespace arrow { |
44 | namespace io { |
45 | |
46 | namespace { |
47 | |
48 | std::string TranslateErrno(int error_code) { |
49 | std::stringstream ss; |
50 | ss << error_code << " (" << strerror(error_code) << ")" ; |
51 | if (error_code == 255) { |
52 | // Unknown error can occur if the host is correct but the port is not |
53 | ss << " Please check that you are connecting to the correct HDFS RPC port" ; |
54 | } |
55 | return ss.str(); |
56 | } |
57 | |
58 | } // namespace |
59 | |
60 | #define CHECK_FAILURE(RETURN_VALUE, WHAT) \ |
61 | do { \ |
62 | if (RETURN_VALUE == -1) { \ |
63 | return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \ |
64 | } \ |
65 | } while (0) |
66 | |
67 | static constexpr int kDefaultHdfsBufferSize = 1 << 16; |
68 | |
69 | // ---------------------------------------------------------------------- |
70 | // File reading |
71 | |
72 | class HdfsAnyFileImpl { |
73 | public: |
74 | void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs, |
75 | hdfsFile handle) { |
76 | path_ = path; |
77 | driver_ = driver; |
78 | fs_ = fs; |
79 | file_ = handle; |
80 | is_open_ = true; |
81 | } |
82 | |
83 | Status Seek(int64_t position) { |
84 | int ret = driver_->Seek(fs_, file_, position); |
85 | CHECK_FAILURE(ret, "seek" ); |
86 | return Status::OK(); |
87 | } |
88 | |
89 | Status Tell(int64_t* offset) { |
90 | int64_t ret = driver_->Tell(fs_, file_); |
91 | CHECK_FAILURE(ret, "tell" ); |
92 | *offset = ret; |
93 | return Status::OK(); |
94 | } |
95 | |
96 | bool is_open() const { return is_open_; } |
97 | |
98 | protected: |
99 | std::string path_; |
100 | |
101 | internal::LibHdfsShim* driver_; |
102 | |
103 | // For threadsafety |
104 | std::mutex lock_; |
105 | |
106 | // These are pointers in libhdfs, so OK to copy |
107 | hdfsFS fs_; |
108 | hdfsFile file_; |
109 | |
110 | bool is_open_; |
111 | }; |
112 | |
113 | namespace { |
114 | |
115 | Status GetPathInfoFailed(const std::string& path) { |
116 | std::stringstream ss; |
117 | ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno); |
118 | return Status::IOError(ss.str()); |
119 | } |
120 | |
121 | } // namespace |
122 | |
123 | // Private implementation for read-only files |
124 | class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { |
125 | public: |
126 | explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} |
127 | |
128 | Status Close() { |
129 | if (is_open_) { |
130 | int ret = driver_->CloseFile(fs_, file_); |
131 | CHECK_FAILURE(ret, "CloseFile" ); |
132 | is_open_ = false; |
133 | } |
134 | return Status::OK(); |
135 | } |
136 | |
137 | bool closed() const { return !is_open_; } |
138 | |
139 | Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* buffer) { |
140 | tSize ret; |
141 | if (driver_->HasPread()) { |
142 | ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position), |
143 | reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes)); |
144 | } else { |
145 | std::lock_guard<std::mutex> guard(lock_); |
146 | RETURN_NOT_OK(Seek(position)); |
147 | return Read(nbytes, bytes_read, buffer); |
148 | } |
149 | CHECK_FAILURE(ret, "read" ); |
150 | *bytes_read = ret; |
151 | return Status::OK(); |
152 | } |
153 | |
154 | Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) { |
155 | std::shared_ptr<ResizableBuffer> buffer; |
156 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); |
157 | |
158 | int64_t bytes_read = 0; |
159 | RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); |
160 | |
161 | if (bytes_read < nbytes) { |
162 | RETURN_NOT_OK(buffer->Resize(bytes_read)); |
163 | buffer->ZeroPadding(); |
164 | } |
165 | |
166 | *out = buffer; |
167 | return Status::OK(); |
168 | } |
169 | |
170 | Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) { |
171 | int64_t total_bytes = 0; |
172 | while (total_bytes < nbytes) { |
173 | tSize ret = driver_->Read( |
174 | fs_, file_, reinterpret_cast<uint8_t*>(buffer) + total_bytes, |
175 | static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes))); |
176 | CHECK_FAILURE(ret, "read" ); |
177 | total_bytes += ret; |
178 | if (ret == 0) { |
179 | break; |
180 | } |
181 | } |
182 | |
183 | *bytes_read = total_bytes; |
184 | return Status::OK(); |
185 | } |
186 | |
187 | Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
188 | std::shared_ptr<ResizableBuffer> buffer; |
189 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); |
190 | |
191 | int64_t bytes_read = 0; |
192 | RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); |
193 | if (bytes_read < nbytes) { |
194 | RETURN_NOT_OK(buffer->Resize(bytes_read)); |
195 | } |
196 | |
197 | *out = buffer; |
198 | return Status::OK(); |
199 | } |
200 | |
201 | Status GetSize(int64_t* size) { |
202 | hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); |
203 | if (entry == nullptr) { |
204 | return GetPathInfoFailed(path_); |
205 | } |
206 | |
207 | *size = entry->mSize; |
208 | driver_->FreeFileInfo(entry, 1); |
209 | return Status::OK(); |
210 | } |
211 | |
212 | void set_memory_pool(MemoryPool* pool) { pool_ = pool; } |
213 | |
214 | void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } |
215 | |
216 | private: |
217 | MemoryPool* pool_; |
218 | int32_t buffer_size_; |
219 | }; |
220 | |
221 | HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { |
222 | if (pool == nullptr) { |
223 | pool = default_memory_pool(); |
224 | } |
225 | impl_.reset(new HdfsReadableFileImpl(pool)); |
226 | } |
227 | |
228 | HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); } |
229 | |
230 | Status HdfsReadableFile::Close() { return impl_->Close(); } |
231 | |
232 | bool HdfsReadableFile::closed() const { return impl_->closed(); } |
233 | |
234 | Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, |
235 | void* buffer) { |
236 | return impl_->ReadAt(position, nbytes, bytes_read, buffer); |
237 | } |
238 | |
239 | Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, |
240 | std::shared_ptr<Buffer>* out) { |
241 | return impl_->ReadAt(position, nbytes, out); |
242 | } |
243 | |
244 | Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) { |
245 | return impl_->Read(nbytes, bytes_read, buffer); |
246 | } |
247 | |
248 | Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* buffer) { |
249 | return impl_->Read(nbytes, buffer); |
250 | } |
251 | |
252 | Status HdfsReadableFile::GetSize(int64_t* size) { return impl_->GetSize(size); } |
253 | |
254 | Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); } |
255 | |
256 | Status HdfsReadableFile::Tell(int64_t* position) const { return impl_->Tell(position); } |
257 | |
258 | // ---------------------------------------------------------------------- |
259 | // File writing |
260 | |
261 | // Private implementation for writable-only files |
262 | class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { |
263 | public: |
264 | HdfsOutputStreamImpl() {} |
265 | |
266 | Status Close() { |
267 | if (is_open_) { |
268 | RETURN_NOT_OK(Flush()); |
269 | int ret = driver_->CloseFile(fs_, file_); |
270 | CHECK_FAILURE(ret, "CloseFile" ); |
271 | is_open_ = false; |
272 | } |
273 | return Status::OK(); |
274 | } |
275 | |
276 | bool closed() const { return !is_open_; } |
277 | |
278 | Status Flush() { |
279 | int ret = driver_->Flush(fs_, file_); |
280 | CHECK_FAILURE(ret, "Flush" ); |
281 | return Status::OK(); |
282 | } |
283 | |
284 | Status Write(const void* buffer, int64_t nbytes, int64_t* bytes_written) { |
285 | std::lock_guard<std::mutex> guard(lock_); |
286 | tSize ret = driver_->Write(fs_, file_, reinterpret_cast<const void*>(buffer), |
287 | static_cast<tSize>(nbytes)); |
288 | CHECK_FAILURE(ret, "Write" ); |
289 | *bytes_written = ret; |
290 | return Status::OK(); |
291 | } |
292 | }; |
293 | |
294 | HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); } |
295 | |
296 | HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_->Close()); } |
297 | |
298 | Status HdfsOutputStream::Close() { return impl_->Close(); } |
299 | |
300 | bool HdfsOutputStream::closed() const { return impl_->closed(); } |
301 | |
302 | Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes, int64_t* bytes_read) { |
303 | return impl_->Write(buffer, nbytes, bytes_read); |
304 | } |
305 | |
306 | Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) { |
307 | int64_t bytes_written_dummy = 0; |
308 | return Write(buffer, nbytes, &bytes_written_dummy); |
309 | } |
310 | |
311 | Status HdfsOutputStream::Flush() { return impl_->Flush(); } |
312 | |
313 | Status HdfsOutputStream::Tell(int64_t* position) const { return impl_->Tell(position); } |
314 | |
315 | // ---------------------------------------------------------------------- |
316 | // HDFS client |
317 | |
318 | // TODO(wesm): this could throw std::bad_alloc in the course of copying strings |
319 | // into the path info object |
320 | static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { |
321 | out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY; |
322 | out->name = std::string(input->mName); |
323 | out->owner = std::string(input->mOwner); |
324 | out->group = std::string(input->mGroup); |
325 | |
326 | out->last_access_time = static_cast<int32_t>(input->mLastAccess); |
327 | out->last_modified_time = static_cast<int32_t>(input->mLastMod); |
328 | out->size = static_cast<int64_t>(input->mSize); |
329 | |
330 | out->replication = input->mReplication; |
331 | out->block_size = input->mBlockSize; |
332 | |
333 | out->permissions = input->mPermissions; |
334 | } |
335 | |
336 | // Private implementation |
337 | class HadoopFileSystem::HadoopFileSystemImpl { |
338 | public: |
339 | HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {} |
340 | |
341 | Status Connect(const HdfsConnectionConfig* config) { |
342 | if (config->driver == HdfsDriver::LIBHDFS3) { |
343 | RETURN_NOT_OK(ConnectLibHdfs3(&driver_)); |
344 | } else { |
345 | RETURN_NOT_OK(ConnectLibHdfs(&driver_)); |
346 | } |
347 | |
348 | // connect to HDFS with the builder object |
349 | hdfsBuilder* builder = driver_->NewBuilder(); |
350 | if (!config->host.empty()) { |
351 | driver_->BuilderSetNameNode(builder, config->host.c_str()); |
352 | } |
353 | driver_->BuilderSetNameNodePort(builder, static_cast<tPort>(config->port)); |
354 | if (!config->user.empty()) { |
355 | driver_->BuilderSetUserName(builder, config->user.c_str()); |
356 | } |
357 | if (!config->kerb_ticket.empty()) { |
358 | driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); |
359 | } |
360 | |
361 | for (auto& kv : config->extra_conf) { |
362 | int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); |
363 | CHECK_FAILURE(ret, "confsetstr" ); |
364 | } |
365 | |
366 | driver_->BuilderSetForceNewInstance(builder); |
367 | fs_ = driver_->BuilderConnect(builder); |
368 | |
369 | if (fs_ == nullptr) { |
370 | return Status::IOError("HDFS connection failed" ); |
371 | } |
372 | namenode_host_ = config->host; |
373 | port_ = config->port; |
374 | user_ = config->user; |
375 | kerb_ticket_ = config->kerb_ticket; |
376 | |
377 | return Status::OK(); |
378 | } |
379 | |
380 | Status MakeDirectory(const std::string& path) { |
381 | int ret = driver_->MakeDirectory(fs_, path.c_str()); |
382 | CHECK_FAILURE(ret, "create directory" ); |
383 | return Status::OK(); |
384 | } |
385 | |
386 | Status Delete(const std::string& path, bool recursive) { |
387 | int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive)); |
388 | CHECK_FAILURE(ret, "delete" ); |
389 | return Status::OK(); |
390 | } |
391 | |
392 | Status Disconnect() { |
393 | int ret = driver_->Disconnect(fs_); |
394 | CHECK_FAILURE(ret, "hdfsFS::Disconnect" ); |
395 | return Status::OK(); |
396 | } |
397 | |
398 | bool Exists(const std::string& path) { |
399 | // hdfsExists does not distinguish between RPC failure and the file not |
400 | // existing |
401 | int ret = driver_->Exists(fs_, path.c_str()); |
402 | return ret == 0; |
403 | } |
404 | |
405 | Status GetCapacity(int64_t* nbytes) { |
406 | tOffset ret = driver_->GetCapacity(fs_); |
407 | CHECK_FAILURE(ret, "GetCapacity" ); |
408 | *nbytes = ret; |
409 | return Status::OK(); |
410 | } |
411 | |
412 | Status GetUsed(int64_t* nbytes) { |
413 | tOffset ret = driver_->GetUsed(fs_); |
414 | CHECK_FAILURE(ret, "GetUsed" ); |
415 | *nbytes = ret; |
416 | return Status::OK(); |
417 | } |
418 | |
419 | Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { |
420 | hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); |
421 | |
422 | if (entry == nullptr) { |
423 | return GetPathInfoFailed(path); |
424 | } |
425 | |
426 | SetPathInfo(entry, info); |
427 | driver_->FreeFileInfo(entry, 1); |
428 | |
429 | return Status::OK(); |
430 | } |
431 | |
432 | Status Stat(const std::string& path, FileStatistics* stat) { |
433 | HdfsPathInfo info; |
434 | RETURN_NOT_OK(GetPathInfo(path, &info)); |
435 | |
436 | stat->size = info.size; |
437 | stat->kind = info.kind; |
438 | return Status::OK(); |
439 | } |
440 | |
441 | Status GetChildren(const std::string& path, std::vector<std::string>* listing) { |
442 | std::vector<HdfsPathInfo> detailed_listing; |
443 | RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); |
444 | for (const auto& info : detailed_listing) { |
445 | listing->push_back(info.name); |
446 | } |
447 | return Status::OK(); |
448 | } |
449 | |
450 | Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) { |
451 | int num_entries = 0; |
452 | errno = 0; |
453 | hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); |
454 | |
455 | if (entries == nullptr) { |
456 | // If the directory is empty, entries is NULL but errno is 0. Non-zero |
457 | // errno indicates error |
458 | // |
459 | // Note: errno is thread-local |
460 | // |
461 | // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set |
462 | // errno 2/ENOENT for empty directories. To be more robust to this we |
463 | // double check this case |
464 | if ((errno == 0) || (errno == ENOENT && Exists(path))) { |
465 | num_entries = 0; |
466 | } else { |
467 | return Status::IOError("HDFS list directory failed, errno: " , |
468 | TranslateErrno(errno)); |
469 | } |
470 | } |
471 | |
472 | // Allocate additional space for elements |
473 | int vec_offset = static_cast<int>(listing->size()); |
474 | listing->resize(vec_offset + num_entries); |
475 | |
476 | for (int i = 0; i < num_entries; ++i) { |
477 | SetPathInfo(entries + i, &(*listing)[vec_offset + i]); |
478 | } |
479 | |
480 | // Free libhdfs file info |
481 | driver_->FreeFileInfo(entries, num_entries); |
482 | |
483 | return Status::OK(); |
484 | } |
485 | |
486 | Status OpenReadable(const std::string& path, int32_t buffer_size, |
487 | std::shared_ptr<HdfsReadableFile>* file) { |
488 | hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); |
489 | |
490 | if (handle == nullptr) { |
491 | const char* msg = !Exists(path) ? "HDFS file does not exist: " |
492 | : "HDFS path exists, but opening file failed: " ; |
493 | return Status::IOError(msg, path); |
494 | } |
495 | |
496 | // std::make_shared does not work with private ctors |
497 | *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile()); |
498 | (*file)->impl_->set_members(path, driver_, fs_, handle); |
499 | (*file)->impl_->set_buffer_size(buffer_size); |
500 | |
501 | return Status::OK(); |
502 | } |
503 | |
504 | Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, |
505 | int16_t replication, int64_t default_block_size, |
506 | std::shared_ptr<HdfsOutputStream>* file) { |
507 | int flags = O_WRONLY; |
508 | if (append) flags |= O_APPEND; |
509 | |
510 | hdfsFile handle = |
511 | driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication, |
512 | static_cast<tSize>(default_block_size)); |
513 | |
514 | if (handle == nullptr) { |
515 | return Status::IOError("Unable to open file " , path); |
516 | } |
517 | |
518 | // std::make_shared does not work with private ctors |
519 | *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream()); |
520 | (*file)->impl_->set_members(path, driver_, fs_, handle); |
521 | |
522 | return Status::OK(); |
523 | } |
524 | |
525 | Status Rename(const std::string& src, const std::string& dst) { |
526 | int ret = driver_->Rename(fs_, src.c_str(), dst.c_str()); |
527 | CHECK_FAILURE(ret, "Rename" ); |
528 | return Status::OK(); |
529 | } |
530 | |
531 | Status Chmod(const std::string& path, int mode) { |
532 | int ret = driver_->Chmod(fs_, path.c_str(), static_cast<short>(mode)); // NOLINT |
533 | CHECK_FAILURE(ret, "Chmod" ); |
534 | return Status::OK(); |
535 | } |
536 | |
537 | Status Chown(const std::string& path, const char* owner, const char* group) { |
538 | int ret = driver_->Chown(fs_, path.c_str(), owner, group); |
539 | CHECK_FAILURE(ret, "Chown" ); |
540 | return Status::OK(); |
541 | } |
542 | |
543 | private: |
544 | internal::LibHdfsShim* driver_; |
545 | |
546 | std::string namenode_host_; |
547 | std::string user_; |
548 | int port_; |
549 | std::string kerb_ticket_; |
550 | |
551 | hdfsFS fs_; |
552 | }; |
553 | |
554 | // ---------------------------------------------------------------------- |
555 | // Public API for HDFSClient |
556 | |
557 | HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); } |
558 | |
559 | HadoopFileSystem::~HadoopFileSystem() {} |
560 | |
561 | Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config, |
562 | std::shared_ptr<HadoopFileSystem>* fs) { |
563 | // ctor is private, make_shared will not work |
564 | *fs = std::shared_ptr<HadoopFileSystem>(new HadoopFileSystem()); |
565 | |
566 | RETURN_NOT_OK((*fs)->impl_->Connect(config)); |
567 | return Status::OK(); |
568 | } |
569 | |
570 | Status HadoopFileSystem::MakeDirectory(const std::string& path) { |
571 | return impl_->MakeDirectory(path); |
572 | } |
573 | |
574 | Status HadoopFileSystem::Delete(const std::string& path, bool recursive) { |
575 | return impl_->Delete(path, recursive); |
576 | } |
577 | |
578 | Status HadoopFileSystem::DeleteDirectory(const std::string& path) { |
579 | return Delete(path, true); |
580 | } |
581 | |
582 | Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); } |
583 | |
584 | bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } |
585 | |
586 | Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) { |
587 | return impl_->GetPathInfo(path, info); |
588 | } |
589 | |
590 | Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) { |
591 | return impl_->Stat(path, stat); |
592 | } |
593 | |
594 | Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { |
595 | return impl_->GetCapacity(nbytes); |
596 | } |
597 | |
598 | Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } |
599 | |
600 | Status HadoopFileSystem::GetChildren(const std::string& path, |
601 | std::vector<std::string>* listing) { |
602 | return impl_->GetChildren(path, listing); |
603 | } |
604 | |
605 | Status HadoopFileSystem::ListDirectory(const std::string& path, |
606 | std::vector<HdfsPathInfo>* listing) { |
607 | return impl_->ListDirectory(path, listing); |
608 | } |
609 | |
610 | Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, |
611 | std::shared_ptr<HdfsReadableFile>* file) { |
612 | return impl_->OpenReadable(path, buffer_size, file); |
613 | } |
614 | |
615 | Status HadoopFileSystem::OpenReadable(const std::string& path, |
616 | std::shared_ptr<HdfsReadableFile>* file) { |
617 | return OpenReadable(path, kDefaultHdfsBufferSize, file); |
618 | } |
619 | |
620 | Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, |
621 | int32_t buffer_size, int16_t replication, |
622 | int64_t default_block_size, |
623 | std::shared_ptr<HdfsOutputStream>* file) { |
624 | return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size, |
625 | file); |
626 | } |
627 | |
628 | Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, |
629 | std::shared_ptr<HdfsOutputStream>* file) { |
630 | return OpenWritable(path, append, 0, 0, 0, file); |
631 | } |
632 | |
633 | Status HadoopFileSystem::Chmod(const std::string& path, int mode) { |
634 | return impl_->Chmod(path, mode); |
635 | } |
636 | |
637 | Status HadoopFileSystem::Chown(const std::string& path, const char* owner, |
638 | const char* group) { |
639 | return impl_->Chown(path, owner, group); |
640 | } |
641 | |
642 | Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) { |
643 | return impl_->Rename(src, dst); |
644 | } |
645 | |
646 | // Deprecated in 0.11 |
647 | |
648 | Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append, |
649 | int32_t buffer_size, int16_t replication, |
650 | int64_t default_block_size, |
651 | std::shared_ptr<HdfsOutputStream>* file) { |
652 | return OpenWritable(path, append, buffer_size, replication, default_block_size, file); |
653 | } |
654 | |
655 | Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append, |
656 | std::shared_ptr<HdfsOutputStream>* file) { |
657 | return OpenWritable(path, append, 0, 0, 0, file); |
658 | } |
659 | |
660 | // ---------------------------------------------------------------------- |
661 | // Allow public API users to check whether we are set up correctly |
662 | |
663 | Status HaveLibHdfs() { |
664 | internal::LibHdfsShim* driver; |
665 | return internal::ConnectLibHdfs(&driver); |
666 | } |
667 | |
668 | Status HaveLibHdfs3() { |
669 | internal::LibHdfsShim* driver; |
670 | return internal::ConnectLibHdfs3(&driver); |
671 | } |
672 | |
673 | } // namespace io |
674 | } // namespace arrow |
675 | |