1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Atomic.h"
29#include "BlockLocation.h"
30#include "DirectoryIterator.h"
31#include "Exception.h"
32#include "ExceptionInternal.h"
33#include "FileStatus.h"
34#include "FileSystemImpl.h"
35#include "FileSystemStats.h"
36#include "InputStream.h"
37#include "LeaseRenewer.h"
38#include "Logger.h"
39#include "OutputStream.h"
40#include "OutputStreamImpl.h"
41#include "server/LocatedBlocks.h"
42#include "server/NamenodeInfo.h"
43#include "server/NamenodeProxy.h"
44#include "StringUtil.h"
45
46#include <cstring>
47#include <inttypes.h>
48#include <libxml/uri.h>
49#include <strings.h>
50
51namespace Hdfs {
52namespace Internal {
53
54static const std::string GetAbsPath(const std::string & prefix,
55 const std::string & path) {
56 if (path.empty()) {
57 return prefix;
58 }
59
60 if ('/' == path[0]) {
61 return path;
62 } else {
63 return prefix + "/" + path;
64 }
65}
66
67/*
68 * Return the canonical absolute name of file NAME.
69 * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/')
70 */
71static const std::string CanonicalizePath(const std::string & path) {
72 int skip = 0;
73 std::string retval;
74 std::vector<std::string> components = StringSplit(path, "/");
75 std::deque<std::string> tmp;
76 std::vector<std::string>::reverse_iterator s = components.rbegin();
77
78 while (s != components.rend()) {
79 if (s->empty() || *s == ".") {
80 ++s;
81 } else if (*s == "..") {
82 ++skip;
83 ++s;
84 } else {
85 if (skip <= 0) {
86 tmp.push_front(*s);
87 } else {
88 --skip;
89 }
90
91 ++s;
92 }
93 }
94
95 for (size_t i = 0; i < tmp.size(); ++i) {
96 retval += "/";
97 retval += tmp[i];
98 }
99
100 return retval.empty() ? "/" : retval;
101}
102
103FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c)
104 : conf(c),
105 key(key),
106 openedOutputStream(0),
107 nn(NULL),
108 sconf(c),
109 user(key.getUser()) {
110 static atomic<uint32_t> count(0);
111 std::stringstream ss;
112 ss.imbue(std::locale::classic());
113 srand((unsigned int) time(NULL));
114 ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_"
115 << getpid() << "_tid_" << pthread_self();
116 clientName = ss.str();
117 workingDir = std::string("/user/") + user.getEffectiveUser();
118 peerCache = shared_ptr<PeerCache>(new PeerCache(sconf));
119#ifdef MOCK
120 stub = NULL;
121#endif
122 //set log level
123 RootLogger.setLogSeverity(sconf.getLogSeverity());
124}
125
126/**
127 * Destroy a FileSystemBase instance
128 */
129FileSystemImpl::~FileSystemImpl() {
130 try {
131 disconnect();
132 } catch (...) {
133 }
134}
135
136const std::string FileSystemImpl::getStandardPath(const char * path) {
137 std::string base;
138 {
139 lock_guard<mutex> lock(mutWorkingDir);
140 base = workingDir;
141 }
142 return CanonicalizePath(GetAbsPath(base, path));
143}
144
145const char * FileSystemImpl::getClientName() {
146 return clientName.c_str();
147}
148
149void FileSystemImpl::connect() {
150 std::string host, port, uri;
151 std::vector<NamenodeInfo> namenodeInfos;
152
153 if (nn) {
154 THROW(HdfsIOException, "FileSystemImpl: already connected.");
155 }
156
157 host = key.getHost();
158 port = key.getPort();
159 uri += key.getScheme() + "://" + host;
160
161 if (port.empty()) {
162 try {
163 namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf);
164 } catch (const HdfsConfigNotFound & e) {
165 NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration", uri.c_str());
166 }
167
168 tokenService = "ha-hdfs:";
169 tokenService += host;
170 } else {
171 std::stringstream ss;
172 ss.imbue(std::locale::classic());
173 ss << host << ":" << port;
174 namenodeInfos.resize(1);
175 namenodeInfos[0].setRpcAddr(ss.str());
176 tokenService = namenodeInfos[0].getRpcAddr();
177 }
178
179#ifdef MOCK
180 nn = stub->getNamenode();
181#else
182 nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod())));
183#endif
184 /*
185 * To test if the connection is ok
186 */
187 getFsStats();
188}
189
190/**
191 * disconnect from hdfs
192 */
193void FileSystemImpl::disconnect() {
194 if (nn) {
195 nn->close();
196 delete nn;
197 }
198
199 nn = NULL;
200}
201
202/**
203 * To get default number of replication.
204 * @return the default number of replication.
205 */
206int FileSystemImpl::getDefaultReplication() const {
207 return sconf.getDefaultReplica();
208}
209
210/**
211 * To get the default number of block size.
212 * @return the default block size.
213 */
214int64_t FileSystemImpl::getDefaultBlockSize() const {
215 return sconf.getDefaultBlockSize();
216}
217
218/**
219 * To get the home directory.
220 * @return home directory.
221 */
222std::string FileSystemImpl::getHomeDirectory() const {
223 return std::string("/user/") + user.getEffectiveUser();
224}
225
226/**
227 * To delete a file or directory.
228 * @param path the path to be deleted.
229 * @param recursive if path is a directory, delete the contents recursively.
230 * @return return true if success.
231 */
232
233bool FileSystemImpl::deletePath(const char * path, bool recursive) {
234 if (!nn) {
235 THROW(HdfsIOException, "FileSystemImpl: not connected.");
236 }
237
238 if (NULL == path || !strlen(path)) {
239 THROW(InvalidParameter, "Invalid input: path should not be empty");
240 }
241
242 return nn->deleteFile(getStandardPath(path), recursive);
243}
244
245/**
246 * To create a directory which given permission.
247 * @param path the directory path which is to be created.
248 * @param permission directory permission.
249 * @return return true if success.
250 */
251
252bool FileSystemImpl::mkdir(const char * path, const Permission & permission) {
253 if (!nn) {
254 THROW(HdfsIOException, "FileSystemImpl: not connected.");
255 }
256
257 if (NULL == path || !strlen(path)) {
258 THROW(InvalidParameter, "Invalid input: path should not be empty");
259 }
260
261 return nn->mkdirs(getStandardPath(path), permission, false);
262}
263
264/**
265 * To create a directory which given permission.
266 * If parent path does not exits, create it.
267 * @param path the directory path which is to be created.
268 * @param permission directory permission.
269 * @return return true if success.
270 */
271
272bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) {
273 if (!nn) {
274 THROW(HdfsIOException, "FileSystemImpl: not connected.");
275 }
276
277 if (NULL == path || !strlen(path)) {
278 THROW(InvalidParameter, "Invalid input: path should not be empty");
279 }
280
281 return nn->mkdirs(getStandardPath(path), permission, true);
282}
283
284/**
285 * To get path information.
286 * @param path the path which information is to be returned.
287 * @return the path information.
288 */
289FileStatus FileSystemImpl::getFileStatus(const char * path) {
290 if (!nn) {
291 THROW(HdfsIOException, "FileSystemImpl: not connected.");
292 }
293
294 if (NULL == path || !strlen(path)) {
295 THROW(InvalidParameter, "Invalid input: path should not be empty");
296 }
297
298 return nn->getFileInfo(getStandardPath(path));
299}
300
301static void Convert(BlockLocation & bl, const LocatedBlock & lb) {
302 const std::vector<DatanodeInfo> & nodes = lb.getLocations();
303 bl.setCorrupt(lb.isCorrupt());
304 bl.setLength(lb.getNumBytes());
305 bl.setOffset(lb.getOffset());
306 std::vector<std::string> hosts(nodes.size());
307 std::vector<std::string> names(nodes.size());
308 std::vector<std::string> topologyPaths(nodes.size());
309
310 for (size_t i = 0 ; i < nodes.size() ; ++i) {
311 hosts[i] = nodes[i].getHostName();
312 names[i] = nodes[i].getXferAddr();
313 topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr();
314 }
315
316 bl.setNames(names);
317 bl.setHosts(hosts);
318 bl.setTopologyPaths(topologyPaths);
319}
320
321std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations(
322 const char * path, int64_t start, int64_t len) {
323 if (!nn) {
324 THROW(HdfsIOException, "FileSystemImpl: not connected.");
325 }
326
327 if (NULL == path || !strlen(path)) {
328 THROW(InvalidParameter, "Invalid input: path should not be empty");
329 }
330
331 if (start < 0) {
332 THROW(InvalidParameter, "Invalid input: start offset should be positive");
333 }
334
335 if (len < 0) {
336 THROW(InvalidParameter, "Invalid input: length should be positive");
337 }
338
339 LocatedBlocksImpl lbs;
340 nn->getBlockLocations(getStandardPath(path), start, len, lbs);
341 std::vector<LocatedBlock> blocks = lbs.getBlocks();
342 std::vector<BlockLocation> retval(blocks.size());
343
344 for (size_t i = 0; i < blocks.size(); ++i) {
345 Convert(retval[i], blocks[i]);
346 }
347
348 return retval;
349}
350
351/**
352 * list the contents of a directory.
353 * @param path the directory path.
354 * @return return the path informations in the given directory.
355 */
356DirectoryIterator FileSystemImpl::listDirectory(const char * path,
357 bool needLocation) {
358 if (!nn) {
359 THROW(HdfsIOException, "FileSystemImpl: not connected.");
360 }
361
362 if (NULL == path || !strlen(path)) {
363 THROW(InvalidParameter, "Invalid input: path should not be empty");
364 }
365
366 return DirectoryIterator(this, getStandardPath(path), needLocation);
367}
368
369/**
370 * list all the contents of a directory.
371 * @param path The directory path.
372 * @return Return a vector of file informations in the directory.
373 */
374std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path,
375 bool needLocation) {
376 if (!nn) {
377 THROW(HdfsIOException, "FileSystemImpl: not connected.");
378 }
379
380 if (NULL == path || !strlen(path)) {
381 THROW(InvalidParameter, "Invalid input: path should not be empty");
382 }
383
384 std::string startAfter;
385 std::string p = getStandardPath(path);
386 std::vector<FileStatus> retval;
387
388 while (getListing(p, startAfter, needLocation, retval)) {
389 startAfter = retval.back().getPath();
390 }
391
392 return retval;
393}
394
395/**
396 * To set the owner and the group of the path.
397 * username and groupname cannot be empty at the same time.
398 * @param path the path which owner of group is to be changed.
399 * @param username new user name.
400 * @param groupname new group.
401 */
402void FileSystemImpl::setOwner(const char * path, const char * username,
403 const char * groupname) {
404 if (!nn) {
405 THROW(HdfsIOException, "FileSystemImpl: not connected.");
406 }
407
408 if (NULL == path || !strlen(path)) {
409 THROW(InvalidParameter, "Invalid input: path should not be empty");
410 }
411
412 if ((NULL == username || !strlen(username))
413 && (NULL == groupname || !strlen(groupname))) {
414 THROW(InvalidParameter,
415 "Invalid input: username and groupname should not be empty");
416 }
417
418 nn->setOwner(getStandardPath(path), username != NULL ? username : "",
419 groupname != NULL ? groupname : "");
420}
421
422/**
423 * To set the access time or modification time of a path.
424 * @param path the path which access time or modification time is to be changed.
425 * @param mtime new modification time.
426 * @param atime new access time.
427 */
428void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) {
429 if (!nn) {
430 THROW(HdfsIOException, "FileSystemImpl: not connected.");
431 }
432
433 if (NULL == path || !strlen(path)) {
434 THROW(InvalidParameter, "Invalid input: path should not be empty");
435 }
436
437 nn->setTimes(getStandardPath(path), mtime, atime);
438}
439
440/**
441 * To set the permission of a path.
442 * @param path the path which permission is to be changed.
443 * @param permission new permission.
444 */
445void FileSystemImpl::setPermission(const char * path,
446 const Permission & permission) {
447 if (!nn) {
448 THROW(HdfsIOException, "FileSystemImpl: not connected.");
449 }
450
451 if (NULL == path || !strlen(path)) {
452 THROW(InvalidParameter, "Invalid input: path should not be empty");
453 }
454
455 nn->setPermission(getStandardPath(path), permission);
456}
457
458/**
459 * To set the number of replication.
460 * @param path the path which number of replication is to be changed.
461 * @param replication new number of replication.
462 * @return return true if success.
463 */
464
465bool FileSystemImpl::setReplication(const char * path, short replication) {
466 if (!nn) {
467 THROW(HdfsIOException, "FileSystemImpl: not connected.");
468 }
469
470 if (NULL == path || !strlen(path)) {
471 THROW(InvalidParameter, "Invalid input: path should not be empty");
472 }
473
474 return nn->setReplication(getStandardPath(path), replication);
475}
476
477/**
478 * To rename a path.
479 * @param src old path.
480 * @param dst new path.
481 * @return return true if success.
482 */
483
484bool FileSystemImpl::rename(const char * src, const char * dst) {
485 if (!nn) {
486 THROW(HdfsIOException, "FileSystemImpl: not connected.");
487 }
488
489 if (NULL == src || !strlen(src)) {
490 THROW(InvalidParameter, "Invalid input: src should not be empty");
491 }
492
493 if (NULL == dst || !strlen(dst)) {
494 THROW(InvalidParameter, "Invalid input: dst should not be empty");
495 }
496
497 return nn->rename(getStandardPath(src), getStandardPath(dst));
498}
499
500/**
501 * To set working directory.
502 * @param path new working directory.
503 */
504void FileSystemImpl::setWorkingDirectory(const char * path) {
505 if (NULL == path) {
506 THROW(InvalidParameter, "Invalid input: path should not be empty");
507 }
508
509 if (!strlen(path) || '/' != path[0]) {
510 THROW(InvalidParameter,
511 "Invalid input: path should be an absolute path");
512 }
513
514 lock_guard<mutex> lock(mutWorkingDir);
515 workingDir = path;
516}
517
518/**
519 * To get working directory.
520 * @return working directory.
521 */
522std::string FileSystemImpl::getWorkingDirectory() const {
523 return workingDir;
524}
525
526/**
527 * To test if the path exist.
528 * @param path the path which is to be tested.
529 * @return return true if the path exist.
530 */
531
532bool FileSystemImpl::exist(const char * path) {
533 if (!nn) {
534 THROW(HdfsIOException, "FileSystemImpl: not connected.");
535 }
536
537 if (NULL == path || !strlen(path)) {
538 THROW(InvalidParameter, "Invalid input: path should not be empty");
539 }
540
541 try {
542 getFileStatus(path);
543 } catch (const FileNotFoundException & e) {
544 return false;
545 }
546
547 return true;
548}
549
550/**
551 * To get the file system status.
552 * @return the file system status.
553 */
554FileSystemStats FileSystemImpl::getFsStats() {
555 if (!nn) {
556 THROW(HdfsIOException, "FileSystemImpl: not connected.");
557 }
558
559 std::vector<int64_t> retval = nn->getFsStats();
560 assert(retval.size() >= 3);
561 return FileSystemStats(retval[0], retval[1], retval[2]);
562}
563
564/**
565 * Truncate the file in the indicated path to the indicated size.
566 * @param path The path to the file to be truncated
567 * @param size The size the file is to be truncated to
568 *
569 * @return true if and client does not need to wait for block recovery,
570 * false if client needs to wait for block recovery.
571 */
572bool FileSystemImpl::truncate(const char * path, int64_t size) {
573 LOG(DEBUG1, "truncate file %s to length %" PRId64, path, size);
574
575 if (!nn) {
576 THROW(HdfsIOException, "FileSystemImpl: not connected.");
577 }
578
579 if (NULL == path || !strlen(path)) {
580 THROW(InvalidParameter, "Invalid input: src should not be empty.");
581 }
582
583 std::string absPath = getStandardPath(path);
584
585 return nn->truncate(absPath, size, clientName);
586}
587
588std::string FileSystemImpl::getDelegationToken(const char * renewer) {
589 if (!nn) {
590 THROW(HdfsIOException, "FileSystemImpl: not connected.");
591 }
592
593 if (NULL == renewer || !strlen(renewer)) {
594 THROW(InvalidParameter, "Invalid input: renewer should not be empty.");
595 }
596
597 Token retval = nn->getDelegationToken(renewer);
598 retval.setService(tokenService);
599 return retval.toString();
600}
601
602std::string FileSystemImpl::getDelegationToken() {
603 return getDelegationToken(key.getUser().getPrincipal().c_str());
604}
605
606int64_t FileSystemImpl::renewDelegationToken(const std::string & token) {
607 if (!nn) {
608 THROW(HdfsIOException, "FileSystemImpl: not connected.");
609 }
610
611 Token t;
612 t.fromString(token);
613 return nn->renewDelegationToken(t);
614}
615
616void FileSystemImpl::cancelDelegationToken(const std::string & token) {
617 if (!nn) {
618 THROW(HdfsIOException, "FileSystemImpl: not connected.");
619 }
620
621 Token t;
622 t.fromString(token);
623 nn->cancelDelegationToken(t);
624}
625
626void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset,
627 int64_t length, LocatedBlocks & lbs) {
628 if (!nn) {
629 THROW(HdfsIOException, "FileSystemImpl: not connected.");
630 }
631
632 nn->getBlockLocations(src, offset, length, lbs);
633}
634
635void FileSystemImpl::create(const std::string & src, const Permission & masked,
636 int flag, bool createParent, short replication, int64_t blockSize) {
637 if (!nn) {
638 THROW(HdfsIOException, "FileSystemImpl: not connected.");
639 }
640
641 nn->create(src, masked, clientName, flag, createParent, replication,
642 blockSize);
643}
644
645std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >
646FileSystemImpl::append(const std::string& src) {
647 if (!nn) {
648 THROW(HdfsIOException, "FileSystemImpl: not connected.");
649 }
650
651 return nn->append(src, clientName);
652}
653
654void FileSystemImpl::abandonBlock(const ExtendedBlock & b,
655 const std::string & src) {
656 if (!nn) {
657 THROW(HdfsIOException, "FileSystemImpl: not connected.");
658 }
659
660 nn->abandonBlock(b, src, clientName);
661}
662
663shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src,
664 const ExtendedBlock * previous,
665 const std::vector<DatanodeInfo> & excludeNodes) {
666 if (!nn) {
667 THROW(HdfsIOException, "FileSystemImpl: not connected.");
668 }
669
670 return nn->addBlock(src, clientName, previous, excludeNodes);
671}
672
673shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode(
674 const std::string & src, const ExtendedBlock & blk,
675 const std::vector<DatanodeInfo> & existings,
676 const std::vector<std::string> & storageIDs,
677 const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) {
678 if (!nn) {
679 THROW(HdfsIOException, "FileSystemImpl: not connected.");
680 }
681
682 return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes,
683 numAdditionalNodes, clientName);
684}
685
686bool FileSystemImpl::complete(const std::string & src,
687 const ExtendedBlock * last) {
688 if (!nn) {
689 THROW(HdfsIOException, "FileSystemImpl: not connected.");
690 }
691
692 return nn->complete(src, clientName, last);
693}
694
695/*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) {
696 if (!nn) {
697 THROW(HdfsIOException, "FileSystemImpl: not connected.");
698 }
699
700 nn->reportBadBlocks(blocks);
701}*/
702
703void FileSystemImpl::fsync(const std::string & src) {
704 if (!nn) {
705 THROW(HdfsIOException, "FileSystemImpl: not connected.");
706 }
707
708 nn->fsync(src, clientName);
709}
710
711shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline(
712 const ExtendedBlock & block) {
713 if (!nn) {
714 THROW(HdfsIOException, "FileSystemImpl: not connected.");
715 }
716
717 return nn->updateBlockForPipeline(block, clientName);
718}
719
720void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock,
721 const ExtendedBlock & newBlock,
722 const std::vector<DatanodeInfo> & newNodes,
723 const std::vector<std::string> & storageIDs) {
724 if (!nn) {
725 THROW(HdfsIOException, "FileSystemImpl: not connected.");
726 }
727
728 nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs);
729}
730
731bool FileSystemImpl::getListing(const std::string & src,
732 const std::string & startAfter, bool needLocation,
733 std::vector<FileStatus> & dl) {
734 if (!nn) {
735 THROW(HdfsIOException, "FileSystemImpl: not connected.");
736 }
737
738 return nn->getListing(src, startAfter, needLocation, dl);
739}
740
741bool FileSystemImpl::renewLease() {
742 if (!nn) {
743 THROW(HdfsIOException, "FileSystemImpl: not connected.");
744 }
745
746 //protected by LeaseRenewer's lock
747 if (0 == openedOutputStream) {
748 return false;
749 }
750
751 try {
752 nn->renewLease(clientName);
753 return true;
754 } catch (const HdfsException & e) {
755 std::string buffer;
756 LOG(LOG_ERROR,
757 "Failed to renew lease for filesystem which client name is %s, since:\n%s",
758 getClientName(), GetExceptionDetail(e, buffer));
759 } catch (const std::exception & e) {
760 LOG(LOG_ERROR,
761 "Failed to renew lease for filesystem which client name is %s, since:\n%s",
762 getClientName(), e.what());
763 }
764
765 return false;
766}
767
768void FileSystemImpl::registerOpenedOutputStream() {
769 //protected by LeaseRenewer's lock
770 ++openedOutputStream;
771}
772
773bool FileSystemImpl::unregisterOpenedOutputStream() {
774 //protected by LeaseRenewer's lock
775 if (openedOutputStream > 0) {
776 --openedOutputStream;
777 }
778
779 return openedOutputStream == 0;
780}
781
782}
783}
784