| 1 | /******************************************************************** | 
|---|
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. | 
|---|
| 3 | * All rights reserved. | 
|---|
| 4 | * | 
|---|
| 5 | * Author: Zhanwei Wang | 
|---|
| 6 | ********************************************************************/ | 
|---|
| 7 | /******************************************************************** | 
|---|
| 8 | * 2014 - | 
|---|
| 9 | * open source under Apache License Version 2.0 | 
|---|
| 10 | ********************************************************************/ | 
|---|
| 11 | /** | 
|---|
| 12 | * Licensed to the Apache Software Foundation (ASF) under one | 
|---|
| 13 | * or more contributor license agreements.  See the NOTICE file | 
|---|
| 14 | * distributed with this work for additional information | 
|---|
| 15 | * regarding copyright ownership.  The ASF licenses this file | 
|---|
| 16 | * to you under the Apache License, Version 2.0 (the | 
|---|
| 17 | * "License"); you may not use this file except in compliance | 
|---|
| 18 | * with the License.  You may obtain a copy of the License at | 
|---|
| 19 | * | 
|---|
| 20 | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|---|
| 21 | * | 
|---|
| 22 | * Unless required by applicable law or agreed to in writing, software | 
|---|
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, | 
|---|
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|---|
| 25 | * See the License for the specific language governing permissions and | 
|---|
| 26 | * limitations under the License. | 
|---|
| 27 | */ | 
|---|
| 28 | #include "Atomic.h" | 
|---|
| 29 | #include "BlockLocation.h" | 
|---|
| 30 | #include "DirectoryIterator.h" | 
|---|
| 31 | #include "Exception.h" | 
|---|
| 32 | #include "ExceptionInternal.h" | 
|---|
| 33 | #include "FileStatus.h" | 
|---|
| 34 | #include "FileSystemImpl.h" | 
|---|
| 35 | #include "FileSystemStats.h" | 
|---|
| 36 | #include "InputStream.h" | 
|---|
| 37 | #include "LeaseRenewer.h" | 
|---|
| 38 | #include "Logger.h" | 
|---|
| 39 | #include "OutputStream.h" | 
|---|
| 40 | #include "OutputStreamImpl.h" | 
|---|
| 41 | #include "server/LocatedBlocks.h" | 
|---|
| 42 | #include "server/NamenodeInfo.h" | 
|---|
| 43 | #include "server/NamenodeProxy.h" | 
|---|
| 44 | #include "StringUtil.h" | 
|---|
| 45 |  | 
|---|
| 46 | #include <cstring> | 
|---|
| 47 | #include <inttypes.h> | 
|---|
| 48 | #include <libxml/uri.h> | 
|---|
| 49 | #include <strings.h> | 
|---|
| 50 |  | 
|---|
| 51 | namespace Hdfs { | 
|---|
| 52 | namespace Internal { | 
|---|
| 53 |  | 
|---|
| 54 | static const std::string GetAbsPath(const std::string & prefix, | 
|---|
| 55 | const std::string & path) { | 
|---|
| 56 | if (path.empty()) { | 
|---|
| 57 | return prefix; | 
|---|
| 58 | } | 
|---|
| 59 |  | 
|---|
| 60 | if ('/' == path[0]) { | 
|---|
| 61 | return path; | 
|---|
| 62 | } else { | 
|---|
| 63 | return prefix + "/"+ path; | 
|---|
| 64 | } | 
|---|
| 65 | } | 
|---|
| 66 |  | 
|---|
| 67 | /* | 
|---|
| 68 | * Return the canonical absolute name of file NAME. | 
|---|
| 69 | * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/') | 
|---|
| 70 | */ | 
|---|
| 71 | static const std::string CanonicalizePath(const std::string & path) { | 
|---|
| 72 | int skip = 0; | 
|---|
| 73 | std::string retval; | 
|---|
| 74 | std::vector<std::string> components = StringSplit(path, "/"); | 
|---|
| 75 | std::deque<std::string> tmp; | 
|---|
| 76 | std::vector<std::string>::reverse_iterator s = components.rbegin(); | 
|---|
| 77 |  | 
|---|
| 78 | while (s != components.rend()) { | 
|---|
| 79 | if (s->empty() || *s == ".") { | 
|---|
| 80 | ++s; | 
|---|
| 81 | } else if (*s == "..") { | 
|---|
| 82 | ++skip; | 
|---|
| 83 | ++s; | 
|---|
| 84 | } else { | 
|---|
| 85 | if (skip <= 0) { | 
|---|
| 86 | tmp.push_front(*s); | 
|---|
| 87 | } else { | 
|---|
| 88 | --skip; | 
|---|
| 89 | } | 
|---|
| 90 |  | 
|---|
| 91 | ++s; | 
|---|
| 92 | } | 
|---|
| 93 | } | 
|---|
| 94 |  | 
|---|
| 95 | for (size_t i = 0; i < tmp.size(); ++i) { | 
|---|
| 96 | retval += "/"; | 
|---|
| 97 | retval += tmp[i]; | 
|---|
| 98 | } | 
|---|
| 99 |  | 
|---|
| 100 | return retval.empty() ? "/": retval; | 
|---|
| 101 | } | 
|---|
| 102 |  | 
|---|
| 103 | FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c) | 
|---|
| 104 | : conf(c), | 
|---|
| 105 | key(key), | 
|---|
| 106 | openedOutputStream(0), | 
|---|
| 107 | nn(NULL), | 
|---|
| 108 | sconf(c), | 
|---|
| 109 | user(key.getUser()) { | 
|---|
| 110 | static atomic<uint32_t> count(0); | 
|---|
| 111 | std::stringstream ss; | 
|---|
| 112 | ss.imbue(std::locale::classic()); | 
|---|
| 113 | srand((unsigned int) time(NULL)); | 
|---|
| 114 | ss << "libhdfs3_client_random_"<< rand() << "_count_"<< ++count << "_pid_" | 
|---|
| 115 | << getpid() << "_tid_"<< pthread_self(); | 
|---|
| 116 | clientName = ss.str(); | 
|---|
| 117 | workingDir = std::string( "/user/") + user.getEffectiveUser(); | 
|---|
| 118 | peerCache = shared_ptr<PeerCache>(new PeerCache(sconf)); | 
|---|
| 119 | #ifdef MOCK | 
|---|
| 120 | stub = NULL; | 
|---|
| 121 | #endif | 
|---|
| 122 | //set log level | 
|---|
| 123 | RootLogger.setLogSeverity(sconf.getLogSeverity()); | 
|---|
| 124 | } | 
|---|
| 125 |  | 
|---|
| 126 | /** | 
|---|
| 127 | * Destroy a FileSystemBase instance | 
|---|
| 128 | */ | 
|---|
| 129 | FileSystemImpl::~FileSystemImpl() { | 
|---|
| 130 | try { | 
|---|
| 131 | disconnect(); | 
|---|
| 132 | } catch (...) { | 
|---|
| 133 | } | 
|---|
| 134 | } | 
|---|
| 135 |  | 
|---|
| 136 | const std::string FileSystemImpl::getStandardPath(const char * path) { | 
|---|
| 137 | std::string base; | 
|---|
| 138 | { | 
|---|
| 139 | lock_guard<mutex> lock(mutWorkingDir); | 
|---|
| 140 | base = workingDir; | 
|---|
| 141 | } | 
|---|
| 142 | return CanonicalizePath(GetAbsPath(base, path)); | 
|---|
| 143 | } | 
|---|
| 144 |  | 
|---|
| 145 | const char * FileSystemImpl::getClientName() { | 
|---|
| 146 | return clientName.c_str(); | 
|---|
| 147 | } | 
|---|
| 148 |  | 
|---|
| 149 | void FileSystemImpl::connect() { | 
|---|
| 150 | std::string host, port, uri; | 
|---|
| 151 | std::vector<NamenodeInfo> namenodeInfos; | 
|---|
| 152 |  | 
|---|
| 153 | if (nn) { | 
|---|
| 154 | THROW(HdfsIOException, "FileSystemImpl: already connected."); | 
|---|
| 155 | } | 
|---|
| 156 |  | 
|---|
| 157 | host = key.getHost(); | 
|---|
| 158 | port = key.getPort(); | 
|---|
| 159 | uri += key.getScheme() + "://"+ host; | 
|---|
| 160 |  | 
|---|
| 161 | if (port.empty()) { | 
|---|
| 162 | try { | 
|---|
| 163 | namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf); | 
|---|
| 164 | } catch (const HdfsConfigNotFound & e) { | 
|---|
| 165 | NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration", uri.c_str()); | 
|---|
| 166 | } | 
|---|
| 167 |  | 
|---|
| 168 | tokenService = "ha-hdfs:"; | 
|---|
| 169 | tokenService += host; | 
|---|
| 170 | } else { | 
|---|
| 171 | std::stringstream ss; | 
|---|
| 172 | ss.imbue(std::locale::classic()); | 
|---|
| 173 | ss << host << ":"<< port; | 
|---|
| 174 | namenodeInfos.resize(1); | 
|---|
| 175 | namenodeInfos[0].setRpcAddr(ss.str()); | 
|---|
| 176 | tokenService = namenodeInfos[0].getRpcAddr(); | 
|---|
| 177 | } | 
|---|
| 178 |  | 
|---|
| 179 | #ifdef MOCK | 
|---|
| 180 | nn = stub->getNamenode(); | 
|---|
| 181 | #else | 
|---|
| 182 | nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod()))); | 
|---|
| 183 | #endif | 
|---|
| 184 | /* | 
|---|
| 185 | * To test if the connection is ok | 
|---|
| 186 | */ | 
|---|
| 187 | getFsStats(); | 
|---|
| 188 | } | 
|---|
| 189 |  | 
|---|
| 190 | /** | 
|---|
| 191 | * disconnect from hdfs | 
|---|
| 192 | */ | 
|---|
| 193 | void FileSystemImpl::disconnect() { | 
|---|
| 194 | if (nn) { | 
|---|
| 195 | nn->close(); | 
|---|
| 196 | delete nn; | 
|---|
| 197 | } | 
|---|
| 198 |  | 
|---|
| 199 | nn = NULL; | 
|---|
| 200 | } | 
|---|
| 201 |  | 
|---|
| 202 | /** | 
|---|
| 203 | * To get default number of replication. | 
|---|
| 204 | * @return the default number of replication. | 
|---|
| 205 | */ | 
|---|
| 206 | int FileSystemImpl::getDefaultReplication() const { | 
|---|
| 207 | return sconf.getDefaultReplica(); | 
|---|
| 208 | } | 
|---|
| 209 |  | 
|---|
| 210 | /** | 
|---|
| 211 | * To get the default number of block size. | 
|---|
| 212 | * @return the default block size. | 
|---|
| 213 | */ | 
|---|
| 214 | int64_t FileSystemImpl::getDefaultBlockSize() const { | 
|---|
| 215 | return sconf.getDefaultBlockSize(); | 
|---|
| 216 | } | 
|---|
| 217 |  | 
|---|
| 218 | /** | 
|---|
| 219 | * To get the home directory. | 
|---|
| 220 | * @return home directory. | 
|---|
| 221 | */ | 
|---|
| 222 | std::string FileSystemImpl::getHomeDirectory() const { | 
|---|
| 223 | return std::string( "/user/") + user.getEffectiveUser(); | 
|---|
| 224 | } | 
|---|
| 225 |  | 
|---|
| 226 | /** | 
|---|
| 227 | * To delete a file or directory. | 
|---|
| 228 | * @param path the path to be deleted. | 
|---|
| 229 | * @param recursive if path is a directory, delete the contents recursively. | 
|---|
| 230 | * @return return true if success. | 
|---|
| 231 | */ | 
|---|
| 232 |  | 
|---|
| 233 | bool FileSystemImpl::deletePath(const char * path, bool recursive) { | 
|---|
| 234 | if (!nn) { | 
|---|
| 235 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 236 | } | 
|---|
| 237 |  | 
|---|
| 238 | if (NULL == path || !strlen(path)) { | 
|---|
| 239 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 240 | } | 
|---|
| 241 |  | 
|---|
| 242 | return nn->deleteFile(getStandardPath(path), recursive); | 
|---|
| 243 | } | 
|---|
| 244 |  | 
|---|
| 245 | /** | 
|---|
| 246 | * To create a directory which given permission. | 
|---|
| 247 | * @param path the directory path which is to be created. | 
|---|
| 248 | * @param permission directory permission. | 
|---|
| 249 | * @return return true if success. | 
|---|
| 250 | */ | 
|---|
| 251 |  | 
|---|
| 252 | bool FileSystemImpl::mkdir(const char * path, const Permission & permission) { | 
|---|
| 253 | if (!nn) { | 
|---|
| 254 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 255 | } | 
|---|
| 256 |  | 
|---|
| 257 | if (NULL == path || !strlen(path)) { | 
|---|
| 258 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 259 | } | 
|---|
| 260 |  | 
|---|
| 261 | return nn->mkdirs(getStandardPath(path), permission, false); | 
|---|
| 262 | } | 
|---|
| 263 |  | 
|---|
| 264 | /** | 
|---|
| 265 | * To create a directory which given permission. | 
|---|
| 266 | * If parent path does not exits, create it. | 
|---|
| 267 | * @param path the directory path which is to be created. | 
|---|
| 268 | * @param permission directory permission. | 
|---|
| 269 | * @return return true if success. | 
|---|
| 270 | */ | 
|---|
| 271 |  | 
|---|
| 272 | bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) { | 
|---|
| 273 | if (!nn) { | 
|---|
| 274 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 275 | } | 
|---|
| 276 |  | 
|---|
| 277 | if (NULL == path || !strlen(path)) { | 
|---|
| 278 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 279 | } | 
|---|
| 280 |  | 
|---|
| 281 | return nn->mkdirs(getStandardPath(path), permission, true); | 
|---|
| 282 | } | 
|---|
| 283 |  | 
|---|
| 284 | /** | 
|---|
| 285 | * To get path information. | 
|---|
| 286 | * @param path the path which information is to be returned. | 
|---|
| 287 | * @return the path information. | 
|---|
| 288 | */ | 
|---|
| 289 | FileStatus FileSystemImpl::getFileStatus(const char * path) { | 
|---|
| 290 | if (!nn) { | 
|---|
| 291 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 292 | } | 
|---|
| 293 |  | 
|---|
| 294 | if (NULL == path || !strlen(path)) { | 
|---|
| 295 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 296 | } | 
|---|
| 297 |  | 
|---|
| 298 | return nn->getFileInfo(getStandardPath(path)); | 
|---|
| 299 | } | 
|---|
| 300 |  | 
|---|
| 301 | static void Convert(BlockLocation & bl, const LocatedBlock & lb) { | 
|---|
| 302 | const std::vector<DatanodeInfo> & nodes = lb.getLocations(); | 
|---|
| 303 | bl.setCorrupt(lb.isCorrupt()); | 
|---|
| 304 | bl.setLength(lb.getNumBytes()); | 
|---|
| 305 | bl.setOffset(lb.getOffset()); | 
|---|
| 306 | std::vector<std::string> hosts(nodes.size()); | 
|---|
| 307 | std::vector<std::string> names(nodes.size()); | 
|---|
| 308 | std::vector<std::string> topologyPaths(nodes.size()); | 
|---|
| 309 |  | 
|---|
| 310 | for (size_t i = 0 ; i < nodes.size() ; ++i) { | 
|---|
| 311 | hosts[i] = nodes[i].getHostName(); | 
|---|
| 312 | names[i] = nodes[i].getXferAddr(); | 
|---|
| 313 | topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr(); | 
|---|
| 314 | } | 
|---|
| 315 |  | 
|---|
| 316 | bl.setNames(names); | 
|---|
| 317 | bl.setHosts(hosts); | 
|---|
| 318 | bl.setTopologyPaths(topologyPaths); | 
|---|
| 319 | } | 
|---|
| 320 |  | 
|---|
| 321 | std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations( | 
|---|
| 322 | const char * path, int64_t start, int64_t len) { | 
|---|
| 323 | if (!nn) { | 
|---|
| 324 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 325 | } | 
|---|
| 326 |  | 
|---|
| 327 | if (NULL == path || !strlen(path)) { | 
|---|
| 328 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 329 | } | 
|---|
| 330 |  | 
|---|
| 331 | if (start < 0) { | 
|---|
| 332 | THROW(InvalidParameter, "Invalid input: start offset should be positive"); | 
|---|
| 333 | } | 
|---|
| 334 |  | 
|---|
| 335 | if (len < 0) { | 
|---|
| 336 | THROW(InvalidParameter, "Invalid input: length should be positive"); | 
|---|
| 337 | } | 
|---|
| 338 |  | 
|---|
| 339 | LocatedBlocksImpl lbs; | 
|---|
| 340 | nn->getBlockLocations(getStandardPath(path), start, len, lbs); | 
|---|
| 341 | std::vector<LocatedBlock> blocks = lbs.getBlocks(); | 
|---|
| 342 | std::vector<BlockLocation> retval(blocks.size()); | 
|---|
| 343 |  | 
|---|
| 344 | for (size_t i = 0; i < blocks.size(); ++i) { | 
|---|
| 345 | Convert(retval[i], blocks[i]); | 
|---|
| 346 | } | 
|---|
| 347 |  | 
|---|
| 348 | return retval; | 
|---|
| 349 | } | 
|---|
| 350 |  | 
|---|
| 351 | /** | 
|---|
| 352 | * list the contents of a directory. | 
|---|
| 353 | * @param path the directory path. | 
|---|
| 354 | * @return return the path informations in the given directory. | 
|---|
| 355 | */ | 
|---|
| 356 | DirectoryIterator FileSystemImpl::listDirectory(const char * path, | 
|---|
| 357 | bool needLocation) { | 
|---|
| 358 | if (!nn) { | 
|---|
| 359 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 360 | } | 
|---|
| 361 |  | 
|---|
| 362 | if (NULL == path || !strlen(path)) { | 
|---|
| 363 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 364 | } | 
|---|
| 365 |  | 
|---|
| 366 | return DirectoryIterator(this, getStandardPath(path), needLocation); | 
|---|
| 367 | } | 
|---|
| 368 |  | 
|---|
| 369 | /** | 
|---|
| 370 | * list all the contents of a directory. | 
|---|
| 371 | * @param path The directory path. | 
|---|
| 372 | * @return Return a vector of file informations in the directory. | 
|---|
| 373 | */ | 
|---|
| 374 | std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path, | 
|---|
| 375 | bool needLocation) { | 
|---|
| 376 | if (!nn) { | 
|---|
| 377 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 378 | } | 
|---|
| 379 |  | 
|---|
| 380 | if (NULL == path || !strlen(path)) { | 
|---|
| 381 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 382 | } | 
|---|
| 383 |  | 
|---|
| 384 | std::string startAfter; | 
|---|
| 385 | std::string p = getStandardPath(path); | 
|---|
| 386 | std::vector<FileStatus> retval; | 
|---|
| 387 |  | 
|---|
| 388 | while (getListing(p, startAfter, needLocation, retval)) { | 
|---|
| 389 | startAfter = retval.back().getPath(); | 
|---|
| 390 | } | 
|---|
| 391 |  | 
|---|
| 392 | return retval; | 
|---|
| 393 | } | 
|---|
| 394 |  | 
|---|
| 395 | /** | 
|---|
| 396 | * To set the owner and the group of the path. | 
|---|
| 397 | * username and groupname cannot be empty at the same time. | 
|---|
| 398 | * @param path the path which owner of group is to be changed. | 
|---|
| 399 | * @param username new user name. | 
|---|
| 400 | * @param groupname new group. | 
|---|
| 401 | */ | 
|---|
| 402 | void FileSystemImpl::setOwner(const char * path, const char * username, | 
|---|
| 403 | const char * groupname) { | 
|---|
| 404 | if (!nn) { | 
|---|
| 405 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 406 | } | 
|---|
| 407 |  | 
|---|
| 408 | if (NULL == path || !strlen(path)) { | 
|---|
| 409 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 410 | } | 
|---|
| 411 |  | 
|---|
| 412 | if ((NULL == username || !strlen(username)) | 
|---|
| 413 | && (NULL == groupname || !strlen(groupname))) { | 
|---|
| 414 | THROW(InvalidParameter, | 
|---|
| 415 | "Invalid input: username and groupname should not be empty"); | 
|---|
| 416 | } | 
|---|
| 417 |  | 
|---|
| 418 | nn->setOwner(getStandardPath(path), username != NULL ? username : "", | 
|---|
| 419 | groupname != NULL ? groupname : ""); | 
|---|
| 420 | } | 
|---|
| 421 |  | 
|---|
| 422 | /** | 
|---|
| 423 | * To set the access time or modification time of a path. | 
|---|
| 424 | * @param path the path which access time or modification time is to be changed. | 
|---|
| 425 | * @param mtime new modification time. | 
|---|
| 426 | * @param atime new access time. | 
|---|
| 427 | */ | 
|---|
| 428 | void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) { | 
|---|
| 429 | if (!nn) { | 
|---|
| 430 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 431 | } | 
|---|
| 432 |  | 
|---|
| 433 | if (NULL == path || !strlen(path)) { | 
|---|
| 434 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 435 | } | 
|---|
| 436 |  | 
|---|
| 437 | nn->setTimes(getStandardPath(path), mtime, atime); | 
|---|
| 438 | } | 
|---|
| 439 |  | 
|---|
| 440 | /** | 
|---|
| 441 | * To set the permission of a path. | 
|---|
| 442 | * @param path the path which permission is to be changed. | 
|---|
| 443 | * @param permission new permission. | 
|---|
| 444 | */ | 
|---|
| 445 | void FileSystemImpl::setPermission(const char * path, | 
|---|
| 446 | const Permission & permission) { | 
|---|
| 447 | if (!nn) { | 
|---|
| 448 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 449 | } | 
|---|
| 450 |  | 
|---|
| 451 | if (NULL == path || !strlen(path)) { | 
|---|
| 452 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 453 | } | 
|---|
| 454 |  | 
|---|
| 455 | nn->setPermission(getStandardPath(path), permission); | 
|---|
| 456 | } | 
|---|
| 457 |  | 
|---|
| 458 | /** | 
|---|
| 459 | * To set the number of replication. | 
|---|
| 460 | * @param path the path which number of replication is to be changed. | 
|---|
| 461 | * @param replication new number of replication. | 
|---|
| 462 | * @return return true if success. | 
|---|
| 463 | */ | 
|---|
| 464 |  | 
|---|
| 465 | bool FileSystemImpl::setReplication(const char * path, short replication) { | 
|---|
| 466 | if (!nn) { | 
|---|
| 467 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 468 | } | 
|---|
| 469 |  | 
|---|
| 470 | if (NULL == path || !strlen(path)) { | 
|---|
| 471 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 472 | } | 
|---|
| 473 |  | 
|---|
| 474 | return nn->setReplication(getStandardPath(path), replication); | 
|---|
| 475 | } | 
|---|
| 476 |  | 
|---|
| 477 | /** | 
|---|
| 478 | * To rename a path. | 
|---|
| 479 | * @param src old path. | 
|---|
| 480 | * @param dst new path. | 
|---|
| 481 | * @return return true if success. | 
|---|
| 482 | */ | 
|---|
| 483 |  | 
|---|
| 484 | bool FileSystemImpl::rename(const char * src, const char * dst) { | 
|---|
| 485 | if (!nn) { | 
|---|
| 486 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 487 | } | 
|---|
| 488 |  | 
|---|
| 489 | if (NULL == src || !strlen(src)) { | 
|---|
| 490 | THROW(InvalidParameter, "Invalid input: src should not be empty"); | 
|---|
| 491 | } | 
|---|
| 492 |  | 
|---|
| 493 | if (NULL == dst || !strlen(dst)) { | 
|---|
| 494 | THROW(InvalidParameter, "Invalid input: dst should not be empty"); | 
|---|
| 495 | } | 
|---|
| 496 |  | 
|---|
| 497 | return nn->rename(getStandardPath(src), getStandardPath(dst)); | 
|---|
| 498 | } | 
|---|
| 499 |  | 
|---|
| 500 | /** | 
|---|
| 501 | * To set working directory. | 
|---|
| 502 | * @param path new working directory. | 
|---|
| 503 | */ | 
|---|
| 504 | void FileSystemImpl::setWorkingDirectory(const char * path) { | 
|---|
| 505 | if (NULL == path) { | 
|---|
| 506 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 507 | } | 
|---|
| 508 |  | 
|---|
| 509 | if (!strlen(path) || '/' != path[0]) { | 
|---|
| 510 | THROW(InvalidParameter, | 
|---|
| 511 | "Invalid input: path should be an absolute path"); | 
|---|
| 512 | } | 
|---|
| 513 |  | 
|---|
| 514 | lock_guard<mutex> lock(mutWorkingDir); | 
|---|
| 515 | workingDir = path; | 
|---|
| 516 | } | 
|---|
| 517 |  | 
|---|
| 518 | /** | 
|---|
| 519 | * To get working directory. | 
|---|
| 520 | * @return working directory. | 
|---|
| 521 | */ | 
|---|
| 522 | std::string FileSystemImpl::getWorkingDirectory() const { | 
|---|
| 523 | return workingDir; | 
|---|
| 524 | } | 
|---|
| 525 |  | 
|---|
| 526 | /** | 
|---|
| 527 | * To test if the path exist. | 
|---|
| 528 | * @param path the path which is to be tested. | 
|---|
| 529 | * @return return true if the path exist. | 
|---|
| 530 | */ | 
|---|
| 531 |  | 
|---|
| 532 | bool FileSystemImpl::exist(const char * path) { | 
|---|
| 533 | if (!nn) { | 
|---|
| 534 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 535 | } | 
|---|
| 536 |  | 
|---|
| 537 | if (NULL == path || !strlen(path)) { | 
|---|
| 538 | THROW(InvalidParameter, "Invalid input: path should not be empty"); | 
|---|
| 539 | } | 
|---|
| 540 |  | 
|---|
| 541 | try { | 
|---|
| 542 | getFileStatus(path); | 
|---|
| 543 | } catch (const FileNotFoundException & e) { | 
|---|
| 544 | return false; | 
|---|
| 545 | } | 
|---|
| 546 |  | 
|---|
| 547 | return true; | 
|---|
| 548 | } | 
|---|
| 549 |  | 
|---|
| 550 | /** | 
|---|
| 551 | * To get the file system status. | 
|---|
| 552 | * @return the file system status. | 
|---|
| 553 | */ | 
|---|
| 554 | FileSystemStats FileSystemImpl::getFsStats() { | 
|---|
| 555 | if (!nn) { | 
|---|
| 556 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 557 | } | 
|---|
| 558 |  | 
|---|
| 559 | std::vector<int64_t> retval = nn->getFsStats(); | 
|---|
| 560 | assert(retval.size() >= 3); | 
|---|
| 561 | return FileSystemStats(retval[0], retval[1], retval[2]); | 
|---|
| 562 | } | 
|---|
| 563 |  | 
|---|
| 564 | /** | 
|---|
| 565 | * Truncate the file in the indicated path to the indicated size. | 
|---|
| 566 | * @param path The path to the file to be truncated | 
|---|
| 567 | * @param size The size the file is to be truncated to | 
|---|
| 568 | * | 
|---|
| 569 | * @return true if and client does not need to wait for block recovery, | 
|---|
| 570 | * false if client needs to wait for block recovery. | 
|---|
| 571 | */ | 
|---|
| 572 | bool FileSystemImpl::truncate(const char * path, int64_t size) { | 
|---|
| 573 | LOG(DEBUG1, "truncate file %s to length %"PRId64, path, size); | 
|---|
| 574 |  | 
|---|
| 575 | if (!nn) { | 
|---|
| 576 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 577 | } | 
|---|
| 578 |  | 
|---|
| 579 | if (NULL == path || !strlen(path)) { | 
|---|
| 580 | THROW(InvalidParameter, "Invalid input: src should not be empty."); | 
|---|
| 581 | } | 
|---|
| 582 |  | 
|---|
| 583 | std::string absPath = getStandardPath(path); | 
|---|
| 584 |  | 
|---|
| 585 | return nn->truncate(absPath, size, clientName); | 
|---|
| 586 | } | 
|---|
| 587 |  | 
|---|
| 588 | std::string FileSystemImpl::getDelegationToken(const char * renewer) { | 
|---|
| 589 | if (!nn) { | 
|---|
| 590 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 591 | } | 
|---|
| 592 |  | 
|---|
| 593 | if (NULL == renewer || !strlen(renewer)) { | 
|---|
| 594 | THROW(InvalidParameter, "Invalid input: renewer should not be empty."); | 
|---|
| 595 | } | 
|---|
| 596 |  | 
|---|
| 597 | Token retval = nn->getDelegationToken(renewer); | 
|---|
| 598 | retval.setService(tokenService); | 
|---|
| 599 | return retval.toString(); | 
|---|
| 600 | } | 
|---|
| 601 |  | 
|---|
| 602 | std::string FileSystemImpl::getDelegationToken() { | 
|---|
| 603 | return getDelegationToken(key.getUser().getPrincipal().c_str()); | 
|---|
| 604 | } | 
|---|
| 605 |  | 
|---|
| 606 | int64_t FileSystemImpl::renewDelegationToken(const std::string & token) { | 
|---|
| 607 | if (!nn) { | 
|---|
| 608 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 609 | } | 
|---|
| 610 |  | 
|---|
| 611 | Token t; | 
|---|
| 612 | t.fromString(token); | 
|---|
| 613 | return  nn->renewDelegationToken(t); | 
|---|
| 614 | } | 
|---|
| 615 |  | 
|---|
| 616 | void FileSystemImpl::cancelDelegationToken(const std::string & token) { | 
|---|
| 617 | if (!nn) { | 
|---|
| 618 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 619 | } | 
|---|
| 620 |  | 
|---|
| 621 | Token t; | 
|---|
| 622 | t.fromString(token); | 
|---|
| 623 | nn->cancelDelegationToken(t); | 
|---|
| 624 | } | 
|---|
| 625 |  | 
|---|
| 626 | void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset, | 
|---|
| 627 | int64_t length, LocatedBlocks & lbs) { | 
|---|
| 628 | if (!nn) { | 
|---|
| 629 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 630 | } | 
|---|
| 631 |  | 
|---|
| 632 | nn->getBlockLocations(src, offset, length, lbs); | 
|---|
| 633 | } | 
|---|
| 634 |  | 
|---|
| 635 | void FileSystemImpl::create(const std::string & src, const Permission & masked, | 
|---|
| 636 | int flag, bool createParent, short replication, int64_t blockSize) { | 
|---|
| 637 | if (!nn) { | 
|---|
| 638 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 639 | } | 
|---|
| 640 |  | 
|---|
| 641 | nn->create(src, masked, clientName, flag, createParent, replication, | 
|---|
| 642 | blockSize); | 
|---|
| 643 | } | 
|---|
| 644 |  | 
|---|
| 645 | std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > | 
|---|
| 646 | FileSystemImpl::append(const std::string& src) { | 
|---|
| 647 | if (!nn) { | 
|---|
| 648 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 649 | } | 
|---|
| 650 |  | 
|---|
| 651 | return nn->append(src, clientName); | 
|---|
| 652 | } | 
|---|
| 653 |  | 
|---|
| 654 | void FileSystemImpl::abandonBlock(const ExtendedBlock & b, | 
|---|
| 655 | const std::string & src) { | 
|---|
| 656 | if (!nn) { | 
|---|
| 657 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 658 | } | 
|---|
| 659 |  | 
|---|
| 660 | nn->abandonBlock(b, src, clientName); | 
|---|
| 661 | } | 
|---|
| 662 |  | 
|---|
| 663 | shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src, | 
|---|
| 664 | const ExtendedBlock * previous, | 
|---|
| 665 | const std::vector<DatanodeInfo> & excludeNodes) { | 
|---|
| 666 | if (!nn) { | 
|---|
| 667 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 668 | } | 
|---|
| 669 |  | 
|---|
| 670 | return nn->addBlock(src, clientName, previous, excludeNodes); | 
|---|
| 671 | } | 
|---|
| 672 |  | 
|---|
| 673 | shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode( | 
|---|
| 674 | const std::string & src, const ExtendedBlock & blk, | 
|---|
| 675 | const std::vector<DatanodeInfo> & existings, | 
|---|
| 676 | const std::vector<std::string> & storageIDs, | 
|---|
| 677 | const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) { | 
|---|
| 678 | if (!nn) { | 
|---|
| 679 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 680 | } | 
|---|
| 681 |  | 
|---|
| 682 | return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes, | 
|---|
| 683 | numAdditionalNodes, clientName); | 
|---|
| 684 | } | 
|---|
| 685 |  | 
|---|
| 686 | bool FileSystemImpl::complete(const std::string & src, | 
|---|
| 687 | const ExtendedBlock * last) { | 
|---|
| 688 | if (!nn) { | 
|---|
| 689 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 690 | } | 
|---|
| 691 |  | 
|---|
| 692 | return nn->complete(src, clientName, last); | 
|---|
| 693 | } | 
|---|
| 694 |  | 
|---|
| 695 | /*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) { | 
|---|
| 696 | if (!nn) { | 
|---|
| 697 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 698 | } | 
|---|
| 699 |  | 
|---|
| 700 | nn->reportBadBlocks(blocks); | 
|---|
| 701 | }*/ | 
|---|
| 702 |  | 
|---|
| 703 | void FileSystemImpl::fsync(const std::string & src) { | 
|---|
| 704 | if (!nn) { | 
|---|
| 705 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 706 | } | 
|---|
| 707 |  | 
|---|
| 708 | nn->fsync(src, clientName); | 
|---|
| 709 | } | 
|---|
| 710 |  | 
|---|
| 711 | shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline( | 
|---|
| 712 | const ExtendedBlock & block) { | 
|---|
| 713 | if (!nn) { | 
|---|
| 714 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 715 | } | 
|---|
| 716 |  | 
|---|
| 717 | return nn->updateBlockForPipeline(block, clientName); | 
|---|
| 718 | } | 
|---|
| 719 |  | 
|---|
| 720 | void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock, | 
|---|
| 721 | const ExtendedBlock & newBlock, | 
|---|
| 722 | const std::vector<DatanodeInfo> & newNodes, | 
|---|
| 723 | const std::vector<std::string> & storageIDs) { | 
|---|
| 724 | if (!nn) { | 
|---|
| 725 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 726 | } | 
|---|
| 727 |  | 
|---|
| 728 | nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); | 
|---|
| 729 | } | 
|---|
| 730 |  | 
|---|
| 731 | bool FileSystemImpl::getListing(const std::string & src, | 
|---|
| 732 | const std::string & startAfter, bool needLocation, | 
|---|
| 733 | std::vector<FileStatus> & dl) { | 
|---|
| 734 | if (!nn) { | 
|---|
| 735 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 736 | } | 
|---|
| 737 |  | 
|---|
| 738 | return nn->getListing(src, startAfter, needLocation, dl); | 
|---|
| 739 | } | 
|---|
| 740 |  | 
|---|
| 741 | bool FileSystemImpl::renewLease() { | 
|---|
| 742 | if (!nn) { | 
|---|
| 743 | THROW(HdfsIOException, "FileSystemImpl: not connected."); | 
|---|
| 744 | } | 
|---|
| 745 |  | 
|---|
| 746 | //protected by LeaseRenewer's lock | 
|---|
| 747 | if (0 == openedOutputStream) { | 
|---|
| 748 | return false; | 
|---|
| 749 | } | 
|---|
| 750 |  | 
|---|
| 751 | try { | 
|---|
| 752 | nn->renewLease(clientName); | 
|---|
| 753 | return true; | 
|---|
| 754 | } catch (const HdfsException & e) { | 
|---|
| 755 | std::string buffer; | 
|---|
| 756 | LOG(LOG_ERROR, | 
|---|
| 757 | "Failed to renew lease for filesystem which client name is %s, since:\n%s", | 
|---|
| 758 | getClientName(), GetExceptionDetail(e, buffer)); | 
|---|
| 759 | } catch (const std::exception & e) { | 
|---|
| 760 | LOG(LOG_ERROR, | 
|---|
| 761 | "Failed to renew lease for filesystem which client name is %s, since:\n%s", | 
|---|
| 762 | getClientName(), e.what()); | 
|---|
| 763 | } | 
|---|
| 764 |  | 
|---|
| 765 | return false; | 
|---|
| 766 | } | 
|---|
| 767 |  | 
|---|
| 768 | void FileSystemImpl::registerOpenedOutputStream() { | 
|---|
| 769 | //protected by LeaseRenewer's lock | 
|---|
| 770 | ++openedOutputStream; | 
|---|
| 771 | } | 
|---|
| 772 |  | 
|---|
| 773 | bool FileSystemImpl::unregisterOpenedOutputStream() { | 
|---|
| 774 | //protected by LeaseRenewer's lock | 
|---|
| 775 | if (openedOutputStream > 0) { | 
|---|
| 776 | --openedOutputStream; | 
|---|
| 777 | } | 
|---|
| 778 |  | 
|---|
| 779 | return  openedOutputStream == 0; | 
|---|
| 780 | } | 
|---|
| 781 |  | 
|---|
| 782 | } | 
|---|
| 783 | } | 
|---|
| 784 |  | 
|---|