| 1 | /******************************************************************** |
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Author: Zhanwei Wang |
| 6 | ********************************************************************/ |
| 7 | /******************************************************************** |
| 8 | * 2014 - |
| 9 | * open source under Apache License Version 2.0 |
| 10 | ********************************************************************/ |
| 11 | /** |
| 12 | * Licensed to the Apache Software Foundation (ASF) under one |
| 13 | * or more contributor license agreements. See the NOTICE file |
| 14 | * distributed with this work for additional information |
| 15 | * regarding copyright ownership. The ASF licenses this file |
| 16 | * to you under the Apache License, Version 2.0 (the |
| 17 | * "License"); you may not use this file except in compliance |
| 18 | * with the License. You may obtain a copy of the License at |
| 19 | * |
| 20 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | * |
| 22 | * Unless required by applicable law or agreed to in writing, software |
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | * See the License for the specific language governing permissions and |
| 26 | * limitations under the License. |
| 27 | */ |
| 28 | #include "Atomic.h" |
| 29 | #include "BlockLocation.h" |
| 30 | #include "DirectoryIterator.h" |
| 31 | #include "Exception.h" |
| 32 | #include "ExceptionInternal.h" |
| 33 | #include "FileStatus.h" |
| 34 | #include "FileSystemImpl.h" |
| 35 | #include "FileSystemStats.h" |
| 36 | #include "InputStream.h" |
| 37 | #include "LeaseRenewer.h" |
| 38 | #include "Logger.h" |
| 39 | #include "OutputStream.h" |
| 40 | #include "OutputStreamImpl.h" |
| 41 | #include "server/LocatedBlocks.h" |
| 42 | #include "server/NamenodeInfo.h" |
| 43 | #include "server/NamenodeProxy.h" |
| 44 | #include "StringUtil.h" |
| 45 | |
| 46 | #include <cstring> |
| 47 | #include <inttypes.h> |
| 48 | #include <libxml/uri.h> |
| 49 | #include <strings.h> |
| 50 | |
| 51 | namespace Hdfs { |
| 52 | namespace Internal { |
| 53 | |
| 54 | static const std::string GetAbsPath(const std::string & prefix, |
| 55 | const std::string & path) { |
| 56 | if (path.empty()) { |
| 57 | return prefix; |
| 58 | } |
| 59 | |
| 60 | if ('/' == path[0]) { |
| 61 | return path; |
| 62 | } else { |
| 63 | return prefix + "/" + path; |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | /* |
| 68 | * Return the canonical absolute name of file NAME. |
| 69 | * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/') |
| 70 | */ |
| 71 | static const std::string CanonicalizePath(const std::string & path) { |
| 72 | int skip = 0; |
| 73 | std::string retval; |
| 74 | std::vector<std::string> components = StringSplit(path, "/" ); |
| 75 | std::deque<std::string> tmp; |
| 76 | std::vector<std::string>::reverse_iterator s = components.rbegin(); |
| 77 | |
| 78 | while (s != components.rend()) { |
| 79 | if (s->empty() || *s == "." ) { |
| 80 | ++s; |
| 81 | } else if (*s == ".." ) { |
| 82 | ++skip; |
| 83 | ++s; |
| 84 | } else { |
| 85 | if (skip <= 0) { |
| 86 | tmp.push_front(*s); |
| 87 | } else { |
| 88 | --skip; |
| 89 | } |
| 90 | |
| 91 | ++s; |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | for (size_t i = 0; i < tmp.size(); ++i) { |
| 96 | retval += "/" ; |
| 97 | retval += tmp[i]; |
| 98 | } |
| 99 | |
| 100 | return retval.empty() ? "/" : retval; |
| 101 | } |
| 102 | |
| 103 | FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c) |
| 104 | : conf(c), |
| 105 | key(key), |
| 106 | openedOutputStream(0), |
| 107 | nn(NULL), |
| 108 | sconf(c), |
| 109 | user(key.getUser()) { |
| 110 | static atomic<uint32_t> count(0); |
| 111 | std::stringstream ss; |
| 112 | ss.imbue(std::locale::classic()); |
| 113 | srand((unsigned int) time(NULL)); |
| 114 | ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_" |
| 115 | << getpid() << "_tid_" << pthread_self(); |
| 116 | clientName = ss.str(); |
| 117 | workingDir = std::string("/user/" ) + user.getEffectiveUser(); |
| 118 | peerCache = shared_ptr<PeerCache>(new PeerCache(sconf)); |
| 119 | #ifdef MOCK |
| 120 | stub = NULL; |
| 121 | #endif |
| 122 | //set log level |
| 123 | RootLogger.setLogSeverity(sconf.getLogSeverity()); |
| 124 | } |
| 125 | |
| 126 | /** |
| 127 | * Destroy a FileSystemBase instance |
| 128 | */ |
| 129 | FileSystemImpl::~FileSystemImpl() { |
| 130 | try { |
| 131 | disconnect(); |
| 132 | } catch (...) { |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | const std::string FileSystemImpl::getStandardPath(const char * path) { |
| 137 | std::string base; |
| 138 | { |
| 139 | lock_guard<mutex> lock(mutWorkingDir); |
| 140 | base = workingDir; |
| 141 | } |
| 142 | return CanonicalizePath(GetAbsPath(base, path)); |
| 143 | } |
| 144 | |
| 145 | const char * FileSystemImpl::getClientName() { |
| 146 | return clientName.c_str(); |
| 147 | } |
| 148 | |
| 149 | void FileSystemImpl::connect() { |
| 150 | std::string host, port, uri; |
| 151 | std::vector<NamenodeInfo> namenodeInfos; |
| 152 | |
| 153 | if (nn) { |
| 154 | THROW(HdfsIOException, "FileSystemImpl: already connected." ); |
| 155 | } |
| 156 | |
| 157 | host = key.getHost(); |
| 158 | port = key.getPort(); |
| 159 | uri += key.getScheme() + "://" + host; |
| 160 | |
| 161 | if (port.empty()) { |
| 162 | try { |
| 163 | namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf); |
| 164 | } catch (const HdfsConfigNotFound & e) { |
| 165 | NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration" , uri.c_str()); |
| 166 | } |
| 167 | |
| 168 | tokenService = "ha-hdfs:" ; |
| 169 | tokenService += host; |
| 170 | } else { |
| 171 | std::stringstream ss; |
| 172 | ss.imbue(std::locale::classic()); |
| 173 | ss << host << ":" << port; |
| 174 | namenodeInfos.resize(1); |
| 175 | namenodeInfos[0].setRpcAddr(ss.str()); |
| 176 | tokenService = namenodeInfos[0].getRpcAddr(); |
| 177 | } |
| 178 | |
| 179 | #ifdef MOCK |
| 180 | nn = stub->getNamenode(); |
| 181 | #else |
| 182 | nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod()))); |
| 183 | #endif |
| 184 | /* |
| 185 | * To test if the connection is ok |
| 186 | */ |
| 187 | getFsStats(); |
| 188 | } |
| 189 | |
| 190 | /** |
| 191 | * disconnect from hdfs |
| 192 | */ |
| 193 | void FileSystemImpl::disconnect() { |
| 194 | if (nn) { |
| 195 | nn->close(); |
| 196 | delete nn; |
| 197 | } |
| 198 | |
| 199 | nn = NULL; |
| 200 | } |
| 201 | |
| 202 | /** |
| 203 | * To get default number of replication. |
| 204 | * @return the default number of replication. |
| 205 | */ |
| 206 | int FileSystemImpl::getDefaultReplication() const { |
| 207 | return sconf.getDefaultReplica(); |
| 208 | } |
| 209 | |
| 210 | /** |
| 211 | * To get the default number of block size. |
| 212 | * @return the default block size. |
| 213 | */ |
| 214 | int64_t FileSystemImpl::getDefaultBlockSize() const { |
| 215 | return sconf.getDefaultBlockSize(); |
| 216 | } |
| 217 | |
| 218 | /** |
| 219 | * To get the home directory. |
| 220 | * @return home directory. |
| 221 | */ |
| 222 | std::string FileSystemImpl::getHomeDirectory() const { |
| 223 | return std::string("/user/" ) + user.getEffectiveUser(); |
| 224 | } |
| 225 | |
| 226 | /** |
| 227 | * To delete a file or directory. |
| 228 | * @param path the path to be deleted. |
| 229 | * @param recursive if path is a directory, delete the contents recursively. |
| 230 | * @return return true if success. |
| 231 | */ |
| 232 | |
| 233 | bool FileSystemImpl::deletePath(const char * path, bool recursive) { |
| 234 | if (!nn) { |
| 235 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 236 | } |
| 237 | |
| 238 | if (NULL == path || !strlen(path)) { |
| 239 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 240 | } |
| 241 | |
| 242 | return nn->deleteFile(getStandardPath(path), recursive); |
| 243 | } |
| 244 | |
| 245 | /** |
| 246 | * To create a directory which given permission. |
| 247 | * @param path the directory path which is to be created. |
| 248 | * @param permission directory permission. |
| 249 | * @return return true if success. |
| 250 | */ |
| 251 | |
| 252 | bool FileSystemImpl::mkdir(const char * path, const Permission & permission) { |
| 253 | if (!nn) { |
| 254 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 255 | } |
| 256 | |
| 257 | if (NULL == path || !strlen(path)) { |
| 258 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 259 | } |
| 260 | |
| 261 | return nn->mkdirs(getStandardPath(path), permission, false); |
| 262 | } |
| 263 | |
| 264 | /** |
| 265 | * To create a directory which given permission. |
| 266 | * If parent path does not exits, create it. |
| 267 | * @param path the directory path which is to be created. |
| 268 | * @param permission directory permission. |
| 269 | * @return return true if success. |
| 270 | */ |
| 271 | |
| 272 | bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) { |
| 273 | if (!nn) { |
| 274 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 275 | } |
| 276 | |
| 277 | if (NULL == path || !strlen(path)) { |
| 278 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 279 | } |
| 280 | |
| 281 | return nn->mkdirs(getStandardPath(path), permission, true); |
| 282 | } |
| 283 | |
| 284 | /** |
| 285 | * To get path information. |
| 286 | * @param path the path which information is to be returned. |
| 287 | * @return the path information. |
| 288 | */ |
| 289 | FileStatus FileSystemImpl::getFileStatus(const char * path) { |
| 290 | if (!nn) { |
| 291 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 292 | } |
| 293 | |
| 294 | if (NULL == path || !strlen(path)) { |
| 295 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 296 | } |
| 297 | |
| 298 | return nn->getFileInfo(getStandardPath(path)); |
| 299 | } |
| 300 | |
| 301 | static void Convert(BlockLocation & bl, const LocatedBlock & lb) { |
| 302 | const std::vector<DatanodeInfo> & nodes = lb.getLocations(); |
| 303 | bl.setCorrupt(lb.isCorrupt()); |
| 304 | bl.setLength(lb.getNumBytes()); |
| 305 | bl.setOffset(lb.getOffset()); |
| 306 | std::vector<std::string> hosts(nodes.size()); |
| 307 | std::vector<std::string> names(nodes.size()); |
| 308 | std::vector<std::string> topologyPaths(nodes.size()); |
| 309 | |
| 310 | for (size_t i = 0 ; i < nodes.size() ; ++i) { |
| 311 | hosts[i] = nodes[i].getHostName(); |
| 312 | names[i] = nodes[i].getXferAddr(); |
| 313 | topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr(); |
| 314 | } |
| 315 | |
| 316 | bl.setNames(names); |
| 317 | bl.setHosts(hosts); |
| 318 | bl.setTopologyPaths(topologyPaths); |
| 319 | } |
| 320 | |
| 321 | std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations( |
| 322 | const char * path, int64_t start, int64_t len) { |
| 323 | if (!nn) { |
| 324 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 325 | } |
| 326 | |
| 327 | if (NULL == path || !strlen(path)) { |
| 328 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 329 | } |
| 330 | |
| 331 | if (start < 0) { |
| 332 | THROW(InvalidParameter, "Invalid input: start offset should be positive" ); |
| 333 | } |
| 334 | |
| 335 | if (len < 0) { |
| 336 | THROW(InvalidParameter, "Invalid input: length should be positive" ); |
| 337 | } |
| 338 | |
| 339 | LocatedBlocksImpl lbs; |
| 340 | nn->getBlockLocations(getStandardPath(path), start, len, lbs); |
| 341 | std::vector<LocatedBlock> blocks = lbs.getBlocks(); |
| 342 | std::vector<BlockLocation> retval(blocks.size()); |
| 343 | |
| 344 | for (size_t i = 0; i < blocks.size(); ++i) { |
| 345 | Convert(retval[i], blocks[i]); |
| 346 | } |
| 347 | |
| 348 | return retval; |
| 349 | } |
| 350 | |
| 351 | /** |
| 352 | * list the contents of a directory. |
| 353 | * @param path the directory path. |
| 354 | * @return return the path informations in the given directory. |
| 355 | */ |
| 356 | DirectoryIterator FileSystemImpl::listDirectory(const char * path, |
| 357 | bool needLocation) { |
| 358 | if (!nn) { |
| 359 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 360 | } |
| 361 | |
| 362 | if (NULL == path || !strlen(path)) { |
| 363 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 364 | } |
| 365 | |
| 366 | return DirectoryIterator(this, getStandardPath(path), needLocation); |
| 367 | } |
| 368 | |
| 369 | /** |
| 370 | * list all the contents of a directory. |
| 371 | * @param path The directory path. |
| 372 | * @return Return a vector of file informations in the directory. |
| 373 | */ |
| 374 | std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path, |
| 375 | bool needLocation) { |
| 376 | if (!nn) { |
| 377 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 378 | } |
| 379 | |
| 380 | if (NULL == path || !strlen(path)) { |
| 381 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 382 | } |
| 383 | |
| 384 | std::string startAfter; |
| 385 | std::string p = getStandardPath(path); |
| 386 | std::vector<FileStatus> retval; |
| 387 | |
| 388 | while (getListing(p, startAfter, needLocation, retval)) { |
| 389 | startAfter = retval.back().getPath(); |
| 390 | } |
| 391 | |
| 392 | return retval; |
| 393 | } |
| 394 | |
| 395 | /** |
| 396 | * To set the owner and the group of the path. |
| 397 | * username and groupname cannot be empty at the same time. |
| 398 | * @param path the path which owner of group is to be changed. |
| 399 | * @param username new user name. |
| 400 | * @param groupname new group. |
| 401 | */ |
| 402 | void FileSystemImpl::setOwner(const char * path, const char * username, |
| 403 | const char * groupname) { |
| 404 | if (!nn) { |
| 405 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 406 | } |
| 407 | |
| 408 | if (NULL == path || !strlen(path)) { |
| 409 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 410 | } |
| 411 | |
| 412 | if ((NULL == username || !strlen(username)) |
| 413 | && (NULL == groupname || !strlen(groupname))) { |
| 414 | THROW(InvalidParameter, |
| 415 | "Invalid input: username and groupname should not be empty" ); |
| 416 | } |
| 417 | |
| 418 | nn->setOwner(getStandardPath(path), username != NULL ? username : "" , |
| 419 | groupname != NULL ? groupname : "" ); |
| 420 | } |
| 421 | |
| 422 | /** |
| 423 | * To set the access time or modification time of a path. |
| 424 | * @param path the path which access time or modification time is to be changed. |
| 425 | * @param mtime new modification time. |
| 426 | * @param atime new access time. |
| 427 | */ |
| 428 | void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) { |
| 429 | if (!nn) { |
| 430 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 431 | } |
| 432 | |
| 433 | if (NULL == path || !strlen(path)) { |
| 434 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 435 | } |
| 436 | |
| 437 | nn->setTimes(getStandardPath(path), mtime, atime); |
| 438 | } |
| 439 | |
| 440 | /** |
| 441 | * To set the permission of a path. |
| 442 | * @param path the path which permission is to be changed. |
| 443 | * @param permission new permission. |
| 444 | */ |
| 445 | void FileSystemImpl::setPermission(const char * path, |
| 446 | const Permission & permission) { |
| 447 | if (!nn) { |
| 448 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 449 | } |
| 450 | |
| 451 | if (NULL == path || !strlen(path)) { |
| 452 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 453 | } |
| 454 | |
| 455 | nn->setPermission(getStandardPath(path), permission); |
| 456 | } |
| 457 | |
| 458 | /** |
| 459 | * To set the number of replication. |
| 460 | * @param path the path which number of replication is to be changed. |
| 461 | * @param replication new number of replication. |
| 462 | * @return return true if success. |
| 463 | */ |
| 464 | |
| 465 | bool FileSystemImpl::setReplication(const char * path, short replication) { |
| 466 | if (!nn) { |
| 467 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 468 | } |
| 469 | |
| 470 | if (NULL == path || !strlen(path)) { |
| 471 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 472 | } |
| 473 | |
| 474 | return nn->setReplication(getStandardPath(path), replication); |
| 475 | } |
| 476 | |
| 477 | /** |
| 478 | * To rename a path. |
| 479 | * @param src old path. |
| 480 | * @param dst new path. |
| 481 | * @return return true if success. |
| 482 | */ |
| 483 | |
| 484 | bool FileSystemImpl::rename(const char * src, const char * dst) { |
| 485 | if (!nn) { |
| 486 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 487 | } |
| 488 | |
| 489 | if (NULL == src || !strlen(src)) { |
| 490 | THROW(InvalidParameter, "Invalid input: src should not be empty" ); |
| 491 | } |
| 492 | |
| 493 | if (NULL == dst || !strlen(dst)) { |
| 494 | THROW(InvalidParameter, "Invalid input: dst should not be empty" ); |
| 495 | } |
| 496 | |
| 497 | return nn->rename(getStandardPath(src), getStandardPath(dst)); |
| 498 | } |
| 499 | |
| 500 | /** |
| 501 | * To set working directory. |
| 502 | * @param path new working directory. |
| 503 | */ |
| 504 | void FileSystemImpl::setWorkingDirectory(const char * path) { |
| 505 | if (NULL == path) { |
| 506 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 507 | } |
| 508 | |
| 509 | if (!strlen(path) || '/' != path[0]) { |
| 510 | THROW(InvalidParameter, |
| 511 | "Invalid input: path should be an absolute path" ); |
| 512 | } |
| 513 | |
| 514 | lock_guard<mutex> lock(mutWorkingDir); |
| 515 | workingDir = path; |
| 516 | } |
| 517 | |
| 518 | /** |
| 519 | * To get working directory. |
| 520 | * @return working directory. |
| 521 | */ |
| 522 | std::string FileSystemImpl::getWorkingDirectory() const { |
| 523 | return workingDir; |
| 524 | } |
| 525 | |
| 526 | /** |
| 527 | * To test if the path exist. |
| 528 | * @param path the path which is to be tested. |
| 529 | * @return return true if the path exist. |
| 530 | */ |
| 531 | |
| 532 | bool FileSystemImpl::exist(const char * path) { |
| 533 | if (!nn) { |
| 534 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 535 | } |
| 536 | |
| 537 | if (NULL == path || !strlen(path)) { |
| 538 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
| 539 | } |
| 540 | |
| 541 | try { |
| 542 | getFileStatus(path); |
| 543 | } catch (const FileNotFoundException & e) { |
| 544 | return false; |
| 545 | } |
| 546 | |
| 547 | return true; |
| 548 | } |
| 549 | |
| 550 | /** |
| 551 | * To get the file system status. |
| 552 | * @return the file system status. |
| 553 | */ |
| 554 | FileSystemStats FileSystemImpl::getFsStats() { |
| 555 | if (!nn) { |
| 556 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 557 | } |
| 558 | |
| 559 | std::vector<int64_t> retval = nn->getFsStats(); |
| 560 | assert(retval.size() >= 3); |
| 561 | return FileSystemStats(retval[0], retval[1], retval[2]); |
| 562 | } |
| 563 | |
| 564 | /** |
| 565 | * Truncate the file in the indicated path to the indicated size. |
| 566 | * @param path The path to the file to be truncated |
| 567 | * @param size The size the file is to be truncated to |
| 568 | * |
| 569 | * @return true if and client does not need to wait for block recovery, |
| 570 | * false if client needs to wait for block recovery. |
| 571 | */ |
| 572 | bool FileSystemImpl::truncate(const char * path, int64_t size) { |
| 573 | LOG(DEBUG1, "truncate file %s to length %" PRId64, path, size); |
| 574 | |
| 575 | if (!nn) { |
| 576 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 577 | } |
| 578 | |
| 579 | if (NULL == path || !strlen(path)) { |
| 580 | THROW(InvalidParameter, "Invalid input: src should not be empty." ); |
| 581 | } |
| 582 | |
| 583 | std::string absPath = getStandardPath(path); |
| 584 | |
| 585 | return nn->truncate(absPath, size, clientName); |
| 586 | } |
| 587 | |
| 588 | std::string FileSystemImpl::getDelegationToken(const char * renewer) { |
| 589 | if (!nn) { |
| 590 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 591 | } |
| 592 | |
| 593 | if (NULL == renewer || !strlen(renewer)) { |
| 594 | THROW(InvalidParameter, "Invalid input: renewer should not be empty." ); |
| 595 | } |
| 596 | |
| 597 | Token retval = nn->getDelegationToken(renewer); |
| 598 | retval.setService(tokenService); |
| 599 | return retval.toString(); |
| 600 | } |
| 601 | |
| 602 | std::string FileSystemImpl::getDelegationToken() { |
| 603 | return getDelegationToken(key.getUser().getPrincipal().c_str()); |
| 604 | } |
| 605 | |
| 606 | int64_t FileSystemImpl::renewDelegationToken(const std::string & token) { |
| 607 | if (!nn) { |
| 608 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 609 | } |
| 610 | |
| 611 | Token t; |
| 612 | t.fromString(token); |
| 613 | return nn->renewDelegationToken(t); |
| 614 | } |
| 615 | |
| 616 | void FileSystemImpl::cancelDelegationToken(const std::string & token) { |
| 617 | if (!nn) { |
| 618 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 619 | } |
| 620 | |
| 621 | Token t; |
| 622 | t.fromString(token); |
| 623 | nn->cancelDelegationToken(t); |
| 624 | } |
| 625 | |
| 626 | void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset, |
| 627 | int64_t length, LocatedBlocks & lbs) { |
| 628 | if (!nn) { |
| 629 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 630 | } |
| 631 | |
| 632 | nn->getBlockLocations(src, offset, length, lbs); |
| 633 | } |
| 634 | |
| 635 | void FileSystemImpl::create(const std::string & src, const Permission & masked, |
| 636 | int flag, bool createParent, short replication, int64_t blockSize) { |
| 637 | if (!nn) { |
| 638 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 639 | } |
| 640 | |
| 641 | nn->create(src, masked, clientName, flag, createParent, replication, |
| 642 | blockSize); |
| 643 | } |
| 644 | |
| 645 | std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > |
| 646 | FileSystemImpl::append(const std::string& src) { |
| 647 | if (!nn) { |
| 648 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 649 | } |
| 650 | |
| 651 | return nn->append(src, clientName); |
| 652 | } |
| 653 | |
| 654 | void FileSystemImpl::abandonBlock(const ExtendedBlock & b, |
| 655 | const std::string & src) { |
| 656 | if (!nn) { |
| 657 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 658 | } |
| 659 | |
| 660 | nn->abandonBlock(b, src, clientName); |
| 661 | } |
| 662 | |
| 663 | shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src, |
| 664 | const ExtendedBlock * previous, |
| 665 | const std::vector<DatanodeInfo> & excludeNodes) { |
| 666 | if (!nn) { |
| 667 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 668 | } |
| 669 | |
| 670 | return nn->addBlock(src, clientName, previous, excludeNodes); |
| 671 | } |
| 672 | |
| 673 | shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode( |
| 674 | const std::string & src, const ExtendedBlock & blk, |
| 675 | const std::vector<DatanodeInfo> & existings, |
| 676 | const std::vector<std::string> & storageIDs, |
| 677 | const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) { |
| 678 | if (!nn) { |
| 679 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 680 | } |
| 681 | |
| 682 | return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes, |
| 683 | numAdditionalNodes, clientName); |
| 684 | } |
| 685 | |
| 686 | bool FileSystemImpl::complete(const std::string & src, |
| 687 | const ExtendedBlock * last) { |
| 688 | if (!nn) { |
| 689 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 690 | } |
| 691 | |
| 692 | return nn->complete(src, clientName, last); |
| 693 | } |
| 694 | |
| 695 | /*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) { |
| 696 | if (!nn) { |
| 697 | THROW(HdfsIOException, "FileSystemImpl: not connected."); |
| 698 | } |
| 699 | |
| 700 | nn->reportBadBlocks(blocks); |
| 701 | }*/ |
| 702 | |
| 703 | void FileSystemImpl::fsync(const std::string & src) { |
| 704 | if (!nn) { |
| 705 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 706 | } |
| 707 | |
| 708 | nn->fsync(src, clientName); |
| 709 | } |
| 710 | |
| 711 | shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline( |
| 712 | const ExtendedBlock & block) { |
| 713 | if (!nn) { |
| 714 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 715 | } |
| 716 | |
| 717 | return nn->updateBlockForPipeline(block, clientName); |
| 718 | } |
| 719 | |
| 720 | void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock, |
| 721 | const ExtendedBlock & newBlock, |
| 722 | const std::vector<DatanodeInfo> & newNodes, |
| 723 | const std::vector<std::string> & storageIDs) { |
| 724 | if (!nn) { |
| 725 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 726 | } |
| 727 | |
| 728 | nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); |
| 729 | } |
| 730 | |
| 731 | bool FileSystemImpl::getListing(const std::string & src, |
| 732 | const std::string & startAfter, bool needLocation, |
| 733 | std::vector<FileStatus> & dl) { |
| 734 | if (!nn) { |
| 735 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 736 | } |
| 737 | |
| 738 | return nn->getListing(src, startAfter, needLocation, dl); |
| 739 | } |
| 740 | |
| 741 | bool FileSystemImpl::renewLease() { |
| 742 | if (!nn) { |
| 743 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
| 744 | } |
| 745 | |
| 746 | //protected by LeaseRenewer's lock |
| 747 | if (0 == openedOutputStream) { |
| 748 | return false; |
| 749 | } |
| 750 | |
| 751 | try { |
| 752 | nn->renewLease(clientName); |
| 753 | return true; |
| 754 | } catch (const HdfsException & e) { |
| 755 | std::string buffer; |
| 756 | LOG(LOG_ERROR, |
| 757 | "Failed to renew lease for filesystem which client name is %s, since:\n%s" , |
| 758 | getClientName(), GetExceptionDetail(e, buffer)); |
| 759 | } catch (const std::exception & e) { |
| 760 | LOG(LOG_ERROR, |
| 761 | "Failed to renew lease for filesystem which client name is %s, since:\n%s" , |
| 762 | getClientName(), e.what()); |
| 763 | } |
| 764 | |
| 765 | return false; |
| 766 | } |
| 767 | |
| 768 | void FileSystemImpl::registerOpenedOutputStream() { |
| 769 | //protected by LeaseRenewer's lock |
| 770 | ++openedOutputStream; |
| 771 | } |
| 772 | |
| 773 | bool FileSystemImpl::unregisterOpenedOutputStream() { |
| 774 | //protected by LeaseRenewer's lock |
| 775 | if (openedOutputStream > 0) { |
| 776 | --openedOutputStream; |
| 777 | } |
| 778 | |
| 779 | return openedOutputStream == 0; |
| 780 | } |
| 781 | |
| 782 | } |
| 783 | } |
| 784 | |