1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This shim interface to libhdfs (for runtime shared library loading) has been
19// adapted from the SFrame project, released under the ASF-compatible 3-clause
20// BSD license
21//
22// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
23// variables set, so that libjvm and libhdfs can be located easily
24
25// Copyright (C) 2015 Dato, Inc.
26// All rights reserved.
27//
28// This software may be modified and distributed under the terms
29// of the BSD license. See the LICENSE file for details.
30
31#include "arrow/io/hdfs-internal.h"
32
33#include <cstdint>
34#include <cstdlib>
35#include <mutex>
36#include <sstream> // IWYU pragma: keep
37#include <string>
38#include <vector>
39
40#ifndef _WIN32
41#include <dlfcn.h>
42#endif
43
44#include <boost/filesystem.hpp> // NOLINT
45
46#include "arrow/status.h"
47#include "arrow/util/logging.h"
48
49namespace fs = boost::filesystem;
50
51#ifndef _WIN32
52static void* libjvm_handle = NULL;
53#else
54static HINSTANCE libjvm_handle = NULL;
55#endif
56/*
57 * All the shim pointers
58 */
59
60// Helper functions for dlopens
61static std::vector<fs::path> get_potential_libjvm_paths();
62static std::vector<fs::path> get_potential_libhdfs_paths();
63static std::vector<fs::path> get_potential_libhdfs3_paths();
64static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name,
65#ifndef _WIN32
66 void*& out_handle);
67#else
68 HINSTANCE& out_handle);
69#endif
70
71static std::vector<fs::path> get_potential_libhdfs_paths() {
72 std::vector<fs::path> libhdfs_potential_paths;
73 std::string file_name;
74
75// OS-specific file name
76#ifdef _WIN32
77 file_name = "hdfs.dll";
78#elif __APPLE__
79 file_name = "libhdfs.dylib";
80#else
81 file_name = "libhdfs.so";
82#endif
83
84 // Common paths
85 std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")};
86
87 // Path from environment variable
88 const char* hadoop_home = std::getenv("HADOOP_HOME");
89 if (hadoop_home != nullptr) {
90 auto path = fs::path(hadoop_home) / "lib/native";
91 search_paths.push_back(path);
92 }
93
94 const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR");
95 if (libhdfs_dir != nullptr) {
96 search_paths.push_back(fs::path(libhdfs_dir));
97 }
98
99 // All paths with file name
100 for (auto& path : search_paths) {
101 libhdfs_potential_paths.push_back(path / file_name);
102 }
103
104 return libhdfs_potential_paths;
105}
106
107static std::vector<fs::path> get_potential_libhdfs3_paths() {
108 std::vector<fs::path> potential_paths;
109 std::string file_name;
110
111// OS-specific file name
112#ifdef _WIN32
113 file_name = "hdfs3.dll";
114#elif __APPLE__
115 file_name = "libhdfs3.dylib";
116#else
117 file_name = "libhdfs3.so";
118#endif
119
120 // Common paths
121 std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")};
122
123 const char* libhdfs3_dir = std::getenv("ARROW_LIBHDFS3_DIR");
124 if (libhdfs3_dir != nullptr) {
125 search_paths.push_back(fs::path(libhdfs3_dir));
126 }
127
128 // All paths with file name
129 for (auto& path : search_paths) {
130 potential_paths.push_back(path / file_name);
131 }
132
133 return potential_paths;
134}
135
136static std::vector<fs::path> get_potential_libjvm_paths() {
137 std::vector<fs::path> libjvm_potential_paths;
138
139 std::vector<fs::path> search_prefixes;
140 std::vector<fs::path> search_suffixes;
141 std::string file_name;
142
143// From heuristics
144#ifdef __WIN32
145 search_prefixes = {""};
146 search_suffixes = {"/jre/bin/server", "/bin/server"};
147 file_name = "jvm.dll";
148#elif __APPLE__
149 search_prefixes = {""};
150 search_suffixes = {"", "/jre/lib/server", "/lib/server"};
151 file_name = "libjvm.dylib";
152
153// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
154// expecting users to set an environment variable
155#else
156 search_prefixes = {
157 "/usr/lib/jvm/default-java", // ubuntu / debian distros
158 "/usr/lib/jvm/java", // rhel6
159 "/usr/lib/jvm", // centos6
160 "/usr/lib64/jvm", // opensuse 13
161 "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros
162 "/usr/local/lib/jvm/java", // alt rhel6
163 "/usr/local/lib/jvm", // alt centos6
164 "/usr/local/lib64/jvm", // alt opensuse 13
165 "/usr/local/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros
166 "/usr/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros
167 "/usr/local/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros
168 "/usr/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros
169 "/usr/lib/jvm/java-7-oracle", // alt ubuntu
170 "/usr/lib/jvm/java-8-oracle", // alt ubuntu
171 "/usr/lib/jvm/java-6-oracle", // alt ubuntu
172 "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu
173 "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu
174 "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu
175 "/usr/lib/jvm/default", // alt centos
176 "/usr/java/latest", // alt centos
177 };
178 search_suffixes = {"", "/jre/lib/amd64/server", "/lib/amd64/server"};
179 file_name = "libjvm.so";
180#endif
181 // From direct environment variable
182 char* env_value = NULL;
183 if ((env_value = getenv("JAVA_HOME")) != NULL) {
184 search_prefixes.insert(search_prefixes.begin(), env_value);
185 }
186
187 // Generate cross product between search_prefixes, search_suffixes, and file_name
188 for (auto& prefix : search_prefixes) {
189 for (auto& suffix : search_suffixes) {
190 auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name));
191 libjvm_potential_paths.push_back(path);
192 }
193 }
194
195 return libjvm_potential_paths;
196}
197
198#ifndef _WIN32
199static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name,
200 void*& out_handle) {
201 std::vector<std::string> error_messages;
202
203 for (auto& i : potential_paths) {
204 i.make_preferred();
205 out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL);
206
207 if (out_handle != NULL) {
208 // std::cout << "Loaded " << i << std::endl;
209 break;
210 } else {
211 const char* err_msg = dlerror();
212 if (err_msg != NULL) {
213 error_messages.push_back(std::string(err_msg));
214 } else {
215 error_messages.push_back(std::string(" returned NULL"));
216 }
217 }
218 }
219
220 if (out_handle == NULL) {
221 return arrow::Status::IOError("Unable to load ", name);
222 }
223
224 return arrow::Status::OK();
225}
226
227#else
228static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name,
229 HINSTANCE& out_handle) {
230 std::vector<std::string> error_messages;
231
232 for (auto& i : potential_paths) {
233 i.make_preferred();
234 out_handle = LoadLibrary(i.string().c_str());
235
236 if (out_handle != NULL) {
237 break;
238 } else {
239 // error_messages.push_back(get_last_err_str(GetLastError()));
240 }
241 }
242
243 if (out_handle == NULL) {
244 return arrow::Status::IOError("Unable to load ", name);
245 }
246
247 return arrow::Status::OK();
248}
249#endif // _WIN32
250
251static inline void* GetLibrarySymbol(void* handle, const char* symbol) {
252 if (handle == NULL) return NULL;
253#ifndef _WIN32
254 return dlsym(handle, symbol);
255#else
256
257 void* ret = reinterpret_cast<void*>(
258 GetProcAddress(reinterpret_cast<HINSTANCE>(handle), symbol));
259 if (ret == NULL) {
260 // logstream(LOG_INFO) << "GetProcAddress error: "
261 // << get_last_err_str(GetLastError()) << std::endl;
262 }
263 return ret;
264#endif
265}
266
267#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \
268 do { \
269 if (!SHIM->SYMBOL_NAME) { \
270 *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) = \
271 GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
272 } \
273 if (!SHIM->SYMBOL_NAME) \
274 return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \
275 } while (0)
276
277#define GET_SYMBOL(SHIM, SYMBOL_NAME) \
278 if (!SHIM->SYMBOL_NAME) { \
279 *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) = \
280 GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
281 }
282
283namespace arrow {
284namespace io {
285namespace internal {
286
287static LibHdfsShim libhdfs_shim;
288static LibHdfsShim libhdfs3_shim;
289
290hdfsBuilder* LibHdfsShim::NewBuilder(void) { return this->hdfsNewBuilder(); }
291
292void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) {
293 this->hdfsBuilderSetNameNode(bld, nn);
294}
295
296void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) {
297 this->hdfsBuilderSetNameNodePort(bld, port);
298}
299
300void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) {
301 this->hdfsBuilderSetUserName(bld, userName);
302}
303
304void LibHdfsShim::BuilderSetKerbTicketCachePath(hdfsBuilder* bld,
305 const char* kerbTicketCachePath) {
306 this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
307}
308
309void LibHdfsShim::BuilderSetForceNewInstance(hdfsBuilder* bld) {
310 this->hdfsBuilderSetForceNewInstance(bld);
311}
312
313hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) {
314 return this->hdfsBuilderConnect(bld);
315}
316
317int LibHdfsShim::BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val) {
318 return this->hdfsBuilderConfSetStr(bld, key, val);
319}
320
321int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); }
322
323hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
324 short replication, tSize blocksize) { // NOLINT
325 return this->hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize);
326}
327
328int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) {
329 return this->hdfsCloseFile(fs, file);
330}
331
332int LibHdfsShim::Exists(hdfsFS fs, const char* path) {
333 return this->hdfsExists(fs, path);
334}
335
336int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
337 return this->hdfsSeek(fs, file, desiredPos);
338}
339
340tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { return this->hdfsTell(fs, file); }
341
342tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
343 return this->hdfsRead(fs, file, buffer, length);
344}
345
346bool LibHdfsShim::HasPread() {
347 GET_SYMBOL(this, hdfsPread);
348 return this->hdfsPread != nullptr;
349}
350
351tSize LibHdfsShim::Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
352 tSize length) {
353 GET_SYMBOL(this, hdfsPread);
354 DCHECK(this->hdfsPread);
355 return this->hdfsPread(fs, file, position, buffer, length);
356}
357
358tSize LibHdfsShim::Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) {
359 return this->hdfsWrite(fs, file, buffer, length);
360}
361
362int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { return this->hdfsFlush(fs, file); }
363
364int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) {
365 GET_SYMBOL(this, hdfsAvailable);
366 if (this->hdfsAvailable)
367 return this->hdfsAvailable(fs, file);
368 else
369 return 0;
370}
371
372int LibHdfsShim::Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
373 GET_SYMBOL(this, hdfsCopy);
374 if (this->hdfsCopy)
375 return this->hdfsCopy(srcFS, src, dstFS, dst);
376 else
377 return 0;
378}
379
380int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
381 GET_SYMBOL(this, hdfsMove);
382 if (this->hdfsMove)
383 return this->hdfsMove(srcFS, src, dstFS, dst);
384 else
385 return 0;
386}
387
388int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) {
389 return this->hdfsDelete(fs, path, recursive);
390}
391
392int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) {
393 GET_SYMBOL(this, hdfsRename);
394 if (this->hdfsRename)
395 return this->hdfsRename(fs, oldPath, newPath);
396 else
397 return 0;
398}
399
400char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) {
401 GET_SYMBOL(this, hdfsGetWorkingDirectory);
402 if (this->hdfsGetWorkingDirectory) {
403 return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize);
404 } else {
405 return NULL;
406 }
407}
408
409int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) {
410 GET_SYMBOL(this, hdfsSetWorkingDirectory);
411 if (this->hdfsSetWorkingDirectory) {
412 return this->hdfsSetWorkingDirectory(fs, path);
413 } else {
414 return 0;
415 }
416}
417
418int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) {
419 return this->hdfsCreateDirectory(fs, path);
420}
421
422int LibHdfsShim::SetReplication(hdfsFS fs, const char* path, int16_t replication) {
423 GET_SYMBOL(this, hdfsSetReplication);
424 if (this->hdfsSetReplication) {
425 return this->hdfsSetReplication(fs, path, replication);
426 } else {
427 return 0;
428 }
429}
430
431hdfsFileInfo* LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) {
432 return this->hdfsListDirectory(fs, path, numEntries);
433}
434
435hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) {
436 return this->hdfsGetPathInfo(fs, path);
437}
438
439void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
440 this->hdfsFreeFileInfo(hdfsFileInfo, numEntries);
441}
442
443char*** LibHdfsShim::GetHosts(hdfsFS fs, const char* path, tOffset start,
444 tOffset length) {
445 GET_SYMBOL(this, hdfsGetHosts);
446 if (this->hdfsGetHosts) {
447 return this->hdfsGetHosts(fs, path, start, length);
448 } else {
449 return NULL;
450 }
451}
452
453void LibHdfsShim::FreeHosts(char*** blockHosts) {
454 GET_SYMBOL(this, hdfsFreeHosts);
455 if (this->hdfsFreeHosts) {
456 this->hdfsFreeHosts(blockHosts);
457 }
458}
459
460tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) {
461 GET_SYMBOL(this, hdfsGetDefaultBlockSize);
462 if (this->hdfsGetDefaultBlockSize) {
463 return this->hdfsGetDefaultBlockSize(fs);
464 } else {
465 return 0;
466 }
467}
468
469tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { return this->hdfsGetCapacity(fs); }
470
471tOffset LibHdfsShim::GetUsed(hdfsFS fs) { return this->hdfsGetUsed(fs); }
472
473int LibHdfsShim::Chown(hdfsFS fs, const char* path, const char* owner,
474 const char* group) {
475 return this->hdfsChown(fs, path, owner, group);
476}
477
478int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT
479 return this->hdfsChmod(fs, path, mode);
480}
481
482int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
483 GET_SYMBOL(this, hdfsUtime);
484 if (this->hdfsUtime) {
485 return this->hdfsUtime(fs, path, mtime, atime);
486 } else {
487 return 0;
488 }
489}
490
491Status LibHdfsShim::GetRequiredSymbols() {
492 GET_SYMBOL_REQUIRED(this, hdfsNewBuilder);
493 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode);
494 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort);
495 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName);
496 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath);
497 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance);
498 GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr);
499 GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect);
500 GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory);
501 GET_SYMBOL_REQUIRED(this, hdfsDelete);
502 GET_SYMBOL_REQUIRED(this, hdfsDisconnect);
503 GET_SYMBOL_REQUIRED(this, hdfsExists);
504 GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo);
505 GET_SYMBOL_REQUIRED(this, hdfsGetCapacity);
506 GET_SYMBOL_REQUIRED(this, hdfsGetUsed);
507 GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo);
508 GET_SYMBOL_REQUIRED(this, hdfsListDirectory);
509 GET_SYMBOL_REQUIRED(this, hdfsChown);
510 GET_SYMBOL_REQUIRED(this, hdfsChmod);
511
512 // File methods
513 GET_SYMBOL_REQUIRED(this, hdfsCloseFile);
514 GET_SYMBOL_REQUIRED(this, hdfsFlush);
515 GET_SYMBOL_REQUIRED(this, hdfsOpenFile);
516 GET_SYMBOL_REQUIRED(this, hdfsRead);
517 GET_SYMBOL_REQUIRED(this, hdfsSeek);
518 GET_SYMBOL_REQUIRED(this, hdfsTell);
519 GET_SYMBOL_REQUIRED(this, hdfsWrite);
520
521 return Status::OK();
522}
523
524Status ConnectLibHdfs(LibHdfsShim** driver) {
525 static std::mutex lock;
526 std::lock_guard<std::mutex> guard(lock);
527
528 LibHdfsShim* shim = &libhdfs_shim;
529
530 static bool shim_attempted = false;
531 if (!shim_attempted) {
532 shim_attempted = true;
533
534 shim->Initialize();
535
536 std::vector<fs::path> libjvm_potential_paths = get_potential_libjvm_paths();
537 RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle));
538
539 std::vector<fs::path> libhdfs_potential_paths = get_potential_libhdfs_paths();
540 RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", shim->handle));
541 } else if (shim->handle == nullptr) {
542 return Status::IOError("Prior attempt to load libhdfs failed");
543 }
544
545 *driver = shim;
546 return shim->GetRequiredSymbols();
547}
548
549Status ConnectLibHdfs3(LibHdfsShim** driver) {
550 static std::mutex lock;
551 std::lock_guard<std::mutex> guard(lock);
552
553 LibHdfsShim* shim = &libhdfs3_shim;
554
555 static bool shim_attempted = false;
556 if (!shim_attempted) {
557 shim_attempted = true;
558
559 shim->Initialize();
560
561 std::vector<fs::path> libhdfs3_potential_paths = get_potential_libhdfs3_paths();
562 RETURN_NOT_OK(try_dlopen(libhdfs3_potential_paths, "libhdfs3", shim->handle));
563 } else if (shim->handle == nullptr) {
564 return Status::IOError("Prior attempt to load libhdfs3 failed");
565 }
566
567 *driver = shim;
568 return shim->GetRequiredSymbols();
569}
570
571} // namespace internal
572} // namespace io
573} // namespace arrow
574