1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "DirectoryIterator.h"
29#include "Exception.h"
30#include "ExceptionInternal.h"
31#include "FileSystem.h"
32#include "FileSystemImpl.h"
33#include "FileSystemKey.h"
34#include "Hash.h"
35#include "SessionConfig.h"
36#include "Thread.h"
37#include "Token.h"
38#include "Unordered.h"
39#include "WritableUtils.h"
40
41#include <algorithm>
42#include <string>
43#if WITH_KERBEROS
44#include <krb5/krb5.h>
45#endif
46
47using namespace Hdfs::Internal;
48
49namespace Hdfs {
50
51namespace Internal {
52
53static std::string ExtractPrincipalFromTicketCache(
54 const std::string & cachePath) {
55#if WITH_KERBEROS
56 krb5_context cxt = NULL;
57 krb5_ccache ccache = NULL;
58 krb5_principal principal = NULL;
59 krb5_error_code ec = 0;
60 std::string errmsg, retval;
61 char * priName = NULL;
62
63 if (!cachePath.empty()) {
64 if (0 != setenv("KRB5CCNAME", cachePath.c_str(), 1)) {
65 THROW(HdfsIOException, "Cannot set env parameter \"KRB5CCNAME\"");
66 }
67 }
68
69 do {
70 if (0 != (ec = krb5_init_context(&cxt))) {
71 break;
72 }
73
74 if (0 != (ec = krb5_cc_default(cxt, &ccache))) {
75 break;
76 }
77
78 if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) {
79 break;
80 }
81
82 if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) {
83 break;
84 }
85 } while (0);
86
87 if (!ec) {
88 retval = priName;
89 } else {
90 if (cxt) {
91 errmsg = krb5_get_error_message(cxt, ec);
92 } else {
93 errmsg = "Cannot initialize kerberos context";
94 }
95 }
96
97 if (priName != NULL) {
98 krb5_free_unparsed_name(cxt, priName);
99 }
100
101 if (principal != NULL) {
102 krb5_free_principal(cxt, principal);
103 }
104
105 if (ccache != NULL) {
106 krb5_cc_close(cxt, ccache);
107 }
108
109 if (cxt != NULL) {
110 krb5_free_context(cxt);
111 }
112
113 if (!errmsg.empty()) {
114 THROW(HdfsIOException,
115 "FileSystem: Failed to extract principal from ticket cache: %s",
116 errmsg.c_str());
117 }
118
119 return retval;
120#else
121 THROW(HdfsIOException, "libhdfs3 built without kerberos support");
122#endif
123
124}
125
126
127static std::string ExtractPrincipalFromToken(const Token & token) {
128 std::string realUser, owner;
129 std::string identifier = token.getIdentifier();
130 WritableUtils cin(&identifier[0], identifier.size());
131 char version;
132
133 try {
134 version = cin.readByte();
135
136 if (version != 0) {
137 THROW(HdfsIOException, "Unknown version of delegation token");
138 }
139
140 owner = cin.ReadText();
141 cin.ReadText();
142 realUser = cin.ReadText();
143 return realUser.empty() ? owner : realUser;
144 } catch (const std::range_error & e) {
145 }
146
147 THROW(HdfsIOException, "Cannot extract principal from token");
148}
149}
150
151FileSystem::FileSystem(const Config & conf) :
152 conf(conf), impl(NULL) {
153}
154
155FileSystem::FileSystem(const FileSystem & other) :
156 conf(other.conf), impl(NULL) {
157 if (other.impl) {
158 impl = new FileSystemWrapper(other.impl->filesystem);
159 }
160}
161
162FileSystem & FileSystem::operator =(const FileSystem & other) {
163 if (this == &other) {
164 return *this;
165 }
166
167 conf = other.conf;
168
169 if (impl) {
170 delete impl;
171 impl = NULL;
172 }
173
174 if (other.impl) {
175 impl = new FileSystemWrapper(other.impl->filesystem);
176 }
177
178 return *this;
179}
180
181FileSystem::~FileSystem() {
182 if (impl) {
183 try {
184 disconnect();
185 } catch (...) {
186 }
187 }
188}
189
190void FileSystem::connect() {
191 Internal::SessionConfig sconf(conf);
192 connect(sconf.getDefaultUri().c_str(), NULL, NULL);
193}
194
195/**
196 * Connect to hdfs
197 * @param uri hdfs connection uri, hdfs://host:port
198 */
199void FileSystem::connect(const char * uri) {
200 connect(uri, NULL, NULL);
201}
202
203static FileSystemWrapper * ConnectInternal(const char * uri,
204 const std::string & principal, const Token * token, Config & conf) {
205 if (NULL == uri || 0 == strlen(uri)) {
206 THROW(InvalidParameter, "Invalid HDFS uri.");
207 }
208
209 FileSystemKey key(uri, principal.c_str());
210
211 if (token) {
212 key.addToken(*token);
213 }
214
215 return new FileSystemWrapper(shared_ptr<FileSystemInter>(new FileSystemImpl(key, conf)));
216}
217
218/**
219 * Connect to hdfs with user or token
220 * username and token cannot be set at the same time
221 * @param uri connection uri.
222 * @param username user used to connect to hdfs
223 * @param token token used to connect to hdfs
224 */
225void FileSystem::connect(const char * uri, const char * username, const char * token) {
226 AuthMethod auth;
227 std::string principal;
228
229 if (impl) {
230 THROW(HdfsIOException, "FileSystem: already connected.");
231 }
232
233 try {
234 SessionConfig sconf(conf);
235 auth = RpcAuth::ParseMethod(sconf.getRpcAuthMethod());
236
237 if (token && auth != AuthMethod::SIMPLE) {
238 Token t;
239 t.fromString(token);
240 principal = ExtractPrincipalFromToken(t);
241 impl = ConnectInternal(uri, principal, &t, conf);
242 impl->filesystem->connect();
243 return;
244 } else if (username) {
245 principal = username;
246 }
247
248 if (auth == AuthMethod::KERBEROS) {
249 principal = ExtractPrincipalFromTicketCache(sconf.getKerberosCachePath());
250 }
251
252 impl = ConnectInternal(uri, principal, NULL, conf);
253 impl->filesystem->connect();
254 } catch (...) {
255 delete impl;
256 impl = NULL;
257 throw;
258 }
259}
260
261/**
262 * disconnect from hdfs
263 */
264void FileSystem::disconnect() {
265 delete impl;
266 impl = NULL;
267}
268
269/**
270 * To get default number of replication.
271 * @return the default number of replication.
272 */
273int FileSystem::getDefaultReplication() const {
274 if (!impl) {
275 THROW(HdfsIOException, "FileSystem: not connected.");
276 }
277
278 return impl->filesystem->getDefaultReplication();
279}
280
281/**
282 * To get the default number of block size.
283 * @return the default block size.
284 */
285int64_t FileSystem::getDefaultBlockSize() const {
286 if (!impl) {
287 THROW(HdfsIOException, "FileSystem: not connected.");
288 }
289
290 return impl->filesystem->getDefaultBlockSize();
291}
292
293/**
294 * To get the home directory.
295 * @return home directory.
296 */
297std::string FileSystem::getHomeDirectory() const {
298 if (!impl) {
299 THROW(HdfsIOException, "FileSystem: not connected.");
300 }
301
302 return impl->filesystem->getHomeDirectory();
303}
304
305/**
306 * To delete a file or directory.
307 * @param path the path to be deleted.
308 * @param recursive if path is a directory, delete the contents recursively.
309 * @return return true if success.
310 */
311bool FileSystem::deletePath(const char * path, bool recursive) {
312 if (!impl) {
313 THROW(HdfsIOException, "FileSystem: not connected.");
314 }
315
316 return impl->filesystem->deletePath(path, recursive);
317}
318
319/**
320 * To create a directory which given permission.
321 * @param path the directory path which is to be created.
322 * @param permission directory permission.
323 * @return return true if success.
324 */
325bool FileSystem::mkdir(const char * path, const Permission & permission) {
326 if (!impl) {
327 THROW(HdfsIOException, "FileSystem: not connected.");
328 }
329
330 return impl->filesystem->mkdir(path, permission);
331}
332
333/**
334 * To create a directory which given permission.
335 * If parent path does not exits, create it.
336 * @param path the directory path which is to be created.
337 * @param permission directory permission.
338 * @return return true if success.
339 */
340bool FileSystem::mkdirs(const char * path, const Permission & permission) {
341 if (!impl) {
342 THROW(HdfsIOException, "FileSystem: not connected.");
343 }
344
345 return impl->filesystem->mkdirs(path, permission);
346}
347
348/**
349 * To get path information.
350 * @param path the path which information is to be returned.
351 * @return the path information.
352 */
353FileStatus FileSystem::getFileStatus(const char * path) const {
354 if (!impl) {
355 THROW(HdfsIOException, "FileSystem: not connected.");
356 }
357
358 return impl->filesystem->getFileStatus(path);
359}
360
361/**
362 * Return an array containing hostnames, offset and size of
363 * portions of the given file.
364 *
365 * This call is most helpful with DFS, where it returns
366 * hostnames of machines that contain the given file.
367 *
368 * The FileSystem will simply return an elt containing 'localhost'.
369 *
370 * @param path path is used to identify an FS since an FS could have
371 * another FS that it could be delegating the call to
372 * @param start offset into the given file
373 * @param len length for which to get locations for
374 */
375std::vector<BlockLocation> FileSystem::getFileBlockLocations(const char * path,
376 int64_t start, int64_t len) {
377 if (!impl) {
378 THROW(HdfsIOException, "FileSystem: not connected.");
379 }
380
381 return impl->filesystem->getFileBlockLocations(path, start, len);
382}
383
384/**
385 * list the contents of a directory.
386 * @param path the directory path.
387 * @return Return a iterator to visit all elements in this directory.
388 */
389DirectoryIterator FileSystem::listDirectory(const char * path) {
390 if (!impl) {
391 THROW(HdfsIOException, "FileSystem: not connected.");
392 }
393
394 return impl->filesystem->listDirectory(path, false);
395}
396
397/**
398 * list all the contents of a directory.
399 * @param path The directory path.
400 * @return Return a vector of file informations in the directory.
401 */
402std::vector<FileStatus> FileSystem::listAllDirectoryItems(const char * path) {
403 if (!impl) {
404 THROW(HdfsIOException, "FileSystem: not connected.");
405 }
406
407 return impl->filesystem->listAllDirectoryItems(path, false);
408}
409
410/**
411 * To set the owner and the group of the path.
412 * username and groupname cannot be empty at the same time.
413 * @param path the path which owner of group is to be changed.
414 * @param username new user name.
415 * @param groupname new group.
416 */
417void FileSystem::setOwner(const char * path, const char * username,
418 const char * groupname) {
419 if (!impl) {
420 THROW(HdfsIOException, "FileSystem: not connected.");
421 }
422
423 impl->filesystem->setOwner(path, username, groupname);
424}
425
426/**
427 * To set the access time or modification time of a path.
428 * @param path the path which access time or modification time is to be changed.
429 * @param mtime new modification time.
430 * @param atime new access time.
431 */
432void FileSystem::setTimes(const char * path, int64_t mtime, int64_t atime) {
433 if (!impl) {
434 THROW(HdfsIOException, "FileSystem: not connected.");
435 }
436
437 impl->filesystem->setTimes(path, mtime, atime);
438}
439
440/**
441 * To set the permission of a path.
442 * @param path the path which permission is to be changed.
443 * @param permission new permission.
444 */
445void FileSystem::setPermission(const char * path,
446 const Permission & permission) {
447 if (!impl) {
448 THROW(HdfsIOException, "FileSystem: not connected.");
449 }
450
451 impl->filesystem->setPermission(path, permission);
452}
453
454/**
455 * To set the number of replication.
456 * @param path the path which number of replication is to be changed.
457 * @param replication new number of replication.
458 * @return return true if success.
459 */
460bool FileSystem::setReplication(const char * path, short replication) {
461 if (!impl) {
462 THROW(HdfsIOException, "FileSystem: not connected.");
463 }
464
465 return impl->filesystem->setReplication(path, replication);
466}
467
468/**
469 * To rename a path.
470 * @param src old path.
471 * @param dst new path.
472 * @return return true if success.
473 */
474bool FileSystem::rename(const char * src, const char * dst) {
475 if (!impl) {
476 THROW(HdfsIOException, "FileSystem: not connected.");
477 }
478
479 return impl->filesystem->rename(src, dst);
480}
481
482/**
483 * To set working directory.
484 * @param path new working directory.
485 */
486void FileSystem::setWorkingDirectory(const char * path) {
487 if (!impl) {
488 THROW(HdfsIOException, "FileSystem: not connected.");
489 }
490
491 impl->filesystem->setWorkingDirectory(path);
492}
493
494/**
495 * To get working directory.
496 * @return working directory.
497 */
498std::string FileSystem::getWorkingDirectory() const {
499 if (!impl) {
500 THROW(HdfsIOException, "FileSystem: not connected.");
501 }
502
503 return impl->filesystem->getWorkingDirectory();
504}
505
506/**
507 * To test if the path exist.
508 * @param path the path which is to be tested.
509 * @return return true if the path exist.
510 */
511bool FileSystem::exist(const char * path) const {
512 if (!impl) {
513 THROW(HdfsIOException, "FileSystem: not connected.");
514 }
515
516 return impl->filesystem->exist(path);
517}
518
519/**
520 * To get the file system status.
521 * @return the file system status.
522 */
523FileSystemStats FileSystem::getStats() const {
524 if (!impl) {
525 THROW(HdfsIOException, "FileSystem: not connected.");
526 }
527
528 return impl->filesystem->getFsStats();
529}
530
531/**
532 * Truncate the file in the indicated path to the indicated size.
533 * @param src The path to the file to be truncated
534 * @param size The size the file is to be truncated to
535 *
536 * @return true if and client does not need to wait for block recovery,
537 * false if client needs to wait for block recovery.
538 */
539bool FileSystem::truncate(const char * src, int64_t size) {
540 if (!impl) {
541 THROW(HdfsIOException, "FileSystem: not connected.");
542 }
543
544 return impl->filesystem->truncate(src, size);
545}
546
547std::string FileSystem::getDelegationToken(const char * renewer) {
548 if (!impl) {
549 THROW(HdfsIOException, "FileSystem: not connected.");
550 }
551
552 return impl->filesystem->getDelegationToken(renewer);
553}
554
555/**
556 * Get a valid Delegation Token using the default user as renewer.
557 *
558 * @return Token
559 * @throws IOException
560 */
561std::string FileSystem::getDelegationToken() {
562 if (!impl) {
563 THROW(HdfsIOException, "FileSystem: not connected.");
564 }
565
566 return impl->filesystem->getDelegationToken();
567}
568
569/**
570 * Renew an existing delegation token.
571 *
572 * @param token delegation token obtained earlier
573 * @return the new expiration time
574 * @throws IOException
575 */
576int64_t FileSystem::renewDelegationToken(const std::string & token) {
577 if (!impl) {
578 THROW(HdfsIOException, "FileSystem: not connected.");
579 }
580
581 return impl->filesystem->renewDelegationToken(token);
582}
583
584/**
585 * Cancel an existing delegation token.
586 *
587 * @param token delegation token
588 * @throws IOException
589 */
590void FileSystem::cancelDelegationToken(const std::string & token) {
591 if (!impl) {
592 THROW(HdfsIOException, "FileSystem: not connected.");
593 }
594
595 impl->filesystem->cancelDelegationToken(token);
596}
597
598}
599