1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "Atomic.h" |
29 | #include "BlockLocation.h" |
30 | #include "DirectoryIterator.h" |
31 | #include "Exception.h" |
32 | #include "ExceptionInternal.h" |
33 | #include "FileStatus.h" |
34 | #include "FileSystemImpl.h" |
35 | #include "FileSystemStats.h" |
36 | #include "InputStream.h" |
37 | #include "LeaseRenewer.h" |
38 | #include "Logger.h" |
39 | #include "OutputStream.h" |
40 | #include "OutputStreamImpl.h" |
41 | #include "server/LocatedBlocks.h" |
42 | #include "server/NamenodeInfo.h" |
43 | #include "server/NamenodeProxy.h" |
44 | #include "StringUtil.h" |
45 | |
46 | #include <cstring> |
47 | #include <inttypes.h> |
48 | #include <libxml/uri.h> |
49 | #include <strings.h> |
50 | |
51 | namespace Hdfs { |
52 | namespace Internal { |
53 | |
54 | static const std::string GetAbsPath(const std::string & prefix, |
55 | const std::string & path) { |
56 | if (path.empty()) { |
57 | return prefix; |
58 | } |
59 | |
60 | if ('/' == path[0]) { |
61 | return path; |
62 | } else { |
63 | return prefix + "/" + path; |
64 | } |
65 | } |
66 | |
67 | /* |
68 | * Return the canonical absolute name of file NAME. |
69 | * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/') |
70 | */ |
71 | static const std::string CanonicalizePath(const std::string & path) { |
72 | int skip = 0; |
73 | std::string retval; |
74 | std::vector<std::string> components = StringSplit(path, "/" ); |
75 | std::deque<std::string> tmp; |
76 | std::vector<std::string>::reverse_iterator s = components.rbegin(); |
77 | |
78 | while (s != components.rend()) { |
79 | if (s->empty() || *s == "." ) { |
80 | ++s; |
81 | } else if (*s == ".." ) { |
82 | ++skip; |
83 | ++s; |
84 | } else { |
85 | if (skip <= 0) { |
86 | tmp.push_front(*s); |
87 | } else { |
88 | --skip; |
89 | } |
90 | |
91 | ++s; |
92 | } |
93 | } |
94 | |
95 | for (size_t i = 0; i < tmp.size(); ++i) { |
96 | retval += "/" ; |
97 | retval += tmp[i]; |
98 | } |
99 | |
100 | return retval.empty() ? "/" : retval; |
101 | } |
102 | |
103 | FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c) |
104 | : conf(c), |
105 | key(key), |
106 | openedOutputStream(0), |
107 | nn(NULL), |
108 | sconf(c), |
109 | user(key.getUser()) { |
110 | static atomic<uint32_t> count(0); |
111 | std::stringstream ss; |
112 | ss.imbue(std::locale::classic()); |
113 | srand((unsigned int) time(NULL)); |
114 | ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_" |
115 | << getpid() << "_tid_" << pthread_self(); |
116 | clientName = ss.str(); |
117 | workingDir = std::string("/user/" ) + user.getEffectiveUser(); |
118 | peerCache = shared_ptr<PeerCache>(new PeerCache(sconf)); |
119 | #ifdef MOCK |
120 | stub = NULL; |
121 | #endif |
122 | //set log level |
123 | RootLogger.setLogSeverity(sconf.getLogSeverity()); |
124 | } |
125 | |
126 | /** |
127 | * Destroy a FileSystemBase instance |
128 | */ |
129 | FileSystemImpl::~FileSystemImpl() { |
130 | try { |
131 | disconnect(); |
132 | } catch (...) { |
133 | } |
134 | } |
135 | |
136 | const std::string FileSystemImpl::getStandardPath(const char * path) { |
137 | std::string base; |
138 | { |
139 | lock_guard<mutex> lock(mutWorkingDir); |
140 | base = workingDir; |
141 | } |
142 | return CanonicalizePath(GetAbsPath(base, path)); |
143 | } |
144 | |
145 | const char * FileSystemImpl::getClientName() { |
146 | return clientName.c_str(); |
147 | } |
148 | |
149 | void FileSystemImpl::connect() { |
150 | std::string host, port, uri; |
151 | std::vector<NamenodeInfo> namenodeInfos; |
152 | |
153 | if (nn) { |
154 | THROW(HdfsIOException, "FileSystemImpl: already connected." ); |
155 | } |
156 | |
157 | host = key.getHost(); |
158 | port = key.getPort(); |
159 | uri += key.getScheme() + "://" + host; |
160 | |
161 | if (port.empty()) { |
162 | try { |
163 | namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf); |
164 | } catch (const HdfsConfigNotFound & e) { |
165 | NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration" , uri.c_str()); |
166 | } |
167 | |
168 | tokenService = "ha-hdfs:" ; |
169 | tokenService += host; |
170 | } else { |
171 | std::stringstream ss; |
172 | ss.imbue(std::locale::classic()); |
173 | ss << host << ":" << port; |
174 | namenodeInfos.resize(1); |
175 | namenodeInfos[0].setRpcAddr(ss.str()); |
176 | tokenService = namenodeInfos[0].getRpcAddr(); |
177 | } |
178 | |
179 | #ifdef MOCK |
180 | nn = stub->getNamenode(); |
181 | #else |
182 | nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod()))); |
183 | #endif |
184 | /* |
185 | * To test if the connection is ok |
186 | */ |
187 | getFsStats(); |
188 | } |
189 | |
190 | /** |
191 | * disconnect from hdfs |
192 | */ |
193 | void FileSystemImpl::disconnect() { |
194 | if (nn) { |
195 | nn->close(); |
196 | delete nn; |
197 | } |
198 | |
199 | nn = NULL; |
200 | } |
201 | |
202 | /** |
203 | * To get default number of replication. |
204 | * @return the default number of replication. |
205 | */ |
206 | int FileSystemImpl::getDefaultReplication() const { |
207 | return sconf.getDefaultReplica(); |
208 | } |
209 | |
210 | /** |
211 | * To get the default number of block size. |
212 | * @return the default block size. |
213 | */ |
214 | int64_t FileSystemImpl::getDefaultBlockSize() const { |
215 | return sconf.getDefaultBlockSize(); |
216 | } |
217 | |
218 | /** |
219 | * To get the home directory. |
220 | * @return home directory. |
221 | */ |
222 | std::string FileSystemImpl::getHomeDirectory() const { |
223 | return std::string("/user/" ) + user.getEffectiveUser(); |
224 | } |
225 | |
226 | /** |
227 | * To delete a file or directory. |
228 | * @param path the path to be deleted. |
229 | * @param recursive if path is a directory, delete the contents recursively. |
230 | * @return return true if success. |
231 | */ |
232 | |
233 | bool FileSystemImpl::deletePath(const char * path, bool recursive) { |
234 | if (!nn) { |
235 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
236 | } |
237 | |
238 | if (NULL == path || !strlen(path)) { |
239 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
240 | } |
241 | |
242 | return nn->deleteFile(getStandardPath(path), recursive); |
243 | } |
244 | |
245 | /** |
246 | * To create a directory which given permission. |
247 | * @param path the directory path which is to be created. |
248 | * @param permission directory permission. |
249 | * @return return true if success. |
250 | */ |
251 | |
252 | bool FileSystemImpl::mkdir(const char * path, const Permission & permission) { |
253 | if (!nn) { |
254 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
255 | } |
256 | |
257 | if (NULL == path || !strlen(path)) { |
258 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
259 | } |
260 | |
261 | return nn->mkdirs(getStandardPath(path), permission, false); |
262 | } |
263 | |
264 | /** |
265 | * To create a directory which given permission. |
266 | * If parent path does not exits, create it. |
267 | * @param path the directory path which is to be created. |
268 | * @param permission directory permission. |
269 | * @return return true if success. |
270 | */ |
271 | |
272 | bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) { |
273 | if (!nn) { |
274 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
275 | } |
276 | |
277 | if (NULL == path || !strlen(path)) { |
278 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
279 | } |
280 | |
281 | return nn->mkdirs(getStandardPath(path), permission, true); |
282 | } |
283 | |
284 | /** |
285 | * To get path information. |
286 | * @param path the path which information is to be returned. |
287 | * @return the path information. |
288 | */ |
289 | FileStatus FileSystemImpl::getFileStatus(const char * path) { |
290 | if (!nn) { |
291 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
292 | } |
293 | |
294 | if (NULL == path || !strlen(path)) { |
295 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
296 | } |
297 | |
298 | return nn->getFileInfo(getStandardPath(path)); |
299 | } |
300 | |
301 | static void Convert(BlockLocation & bl, const LocatedBlock & lb) { |
302 | const std::vector<DatanodeInfo> & nodes = lb.getLocations(); |
303 | bl.setCorrupt(lb.isCorrupt()); |
304 | bl.setLength(lb.getNumBytes()); |
305 | bl.setOffset(lb.getOffset()); |
306 | std::vector<std::string> hosts(nodes.size()); |
307 | std::vector<std::string> names(nodes.size()); |
308 | std::vector<std::string> topologyPaths(nodes.size()); |
309 | |
310 | for (size_t i = 0 ; i < nodes.size() ; ++i) { |
311 | hosts[i] = nodes[i].getHostName(); |
312 | names[i] = nodes[i].getXferAddr(); |
313 | topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr(); |
314 | } |
315 | |
316 | bl.setNames(names); |
317 | bl.setHosts(hosts); |
318 | bl.setTopologyPaths(topologyPaths); |
319 | } |
320 | |
321 | std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations( |
322 | const char * path, int64_t start, int64_t len) { |
323 | if (!nn) { |
324 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
325 | } |
326 | |
327 | if (NULL == path || !strlen(path)) { |
328 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
329 | } |
330 | |
331 | if (start < 0) { |
332 | THROW(InvalidParameter, "Invalid input: start offset should be positive" ); |
333 | } |
334 | |
335 | if (len < 0) { |
336 | THROW(InvalidParameter, "Invalid input: length should be positive" ); |
337 | } |
338 | |
339 | LocatedBlocksImpl lbs; |
340 | nn->getBlockLocations(getStandardPath(path), start, len, lbs); |
341 | std::vector<LocatedBlock> blocks = lbs.getBlocks(); |
342 | std::vector<BlockLocation> retval(blocks.size()); |
343 | |
344 | for (size_t i = 0; i < blocks.size(); ++i) { |
345 | Convert(retval[i], blocks[i]); |
346 | } |
347 | |
348 | return retval; |
349 | } |
350 | |
351 | /** |
352 | * list the contents of a directory. |
353 | * @param path the directory path. |
354 | * @return return the path informations in the given directory. |
355 | */ |
356 | DirectoryIterator FileSystemImpl::listDirectory(const char * path, |
357 | bool needLocation) { |
358 | if (!nn) { |
359 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
360 | } |
361 | |
362 | if (NULL == path || !strlen(path)) { |
363 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
364 | } |
365 | |
366 | return DirectoryIterator(this, getStandardPath(path), needLocation); |
367 | } |
368 | |
369 | /** |
370 | * list all the contents of a directory. |
371 | * @param path The directory path. |
372 | * @return Return a vector of file informations in the directory. |
373 | */ |
374 | std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path, |
375 | bool needLocation) { |
376 | if (!nn) { |
377 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
378 | } |
379 | |
380 | if (NULL == path || !strlen(path)) { |
381 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
382 | } |
383 | |
384 | std::string startAfter; |
385 | std::string p = getStandardPath(path); |
386 | std::vector<FileStatus> retval; |
387 | |
388 | while (getListing(p, startAfter, needLocation, retval)) { |
389 | startAfter = retval.back().getPath(); |
390 | } |
391 | |
392 | return retval; |
393 | } |
394 | |
395 | /** |
396 | * To set the owner and the group of the path. |
397 | * username and groupname cannot be empty at the same time. |
398 | * @param path the path which owner of group is to be changed. |
399 | * @param username new user name. |
400 | * @param groupname new group. |
401 | */ |
402 | void FileSystemImpl::setOwner(const char * path, const char * username, |
403 | const char * groupname) { |
404 | if (!nn) { |
405 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
406 | } |
407 | |
408 | if (NULL == path || !strlen(path)) { |
409 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
410 | } |
411 | |
412 | if ((NULL == username || !strlen(username)) |
413 | && (NULL == groupname || !strlen(groupname))) { |
414 | THROW(InvalidParameter, |
415 | "Invalid input: username and groupname should not be empty" ); |
416 | } |
417 | |
418 | nn->setOwner(getStandardPath(path), username != NULL ? username : "" , |
419 | groupname != NULL ? groupname : "" ); |
420 | } |
421 | |
422 | /** |
423 | * To set the access time or modification time of a path. |
424 | * @param path the path which access time or modification time is to be changed. |
425 | * @param mtime new modification time. |
426 | * @param atime new access time. |
427 | */ |
428 | void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) { |
429 | if (!nn) { |
430 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
431 | } |
432 | |
433 | if (NULL == path || !strlen(path)) { |
434 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
435 | } |
436 | |
437 | nn->setTimes(getStandardPath(path), mtime, atime); |
438 | } |
439 | |
440 | /** |
441 | * To set the permission of a path. |
442 | * @param path the path which permission is to be changed. |
443 | * @param permission new permission. |
444 | */ |
445 | void FileSystemImpl::setPermission(const char * path, |
446 | const Permission & permission) { |
447 | if (!nn) { |
448 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
449 | } |
450 | |
451 | if (NULL == path || !strlen(path)) { |
452 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
453 | } |
454 | |
455 | nn->setPermission(getStandardPath(path), permission); |
456 | } |
457 | |
458 | /** |
459 | * To set the number of replication. |
460 | * @param path the path which number of replication is to be changed. |
461 | * @param replication new number of replication. |
462 | * @return return true if success. |
463 | */ |
464 | |
465 | bool FileSystemImpl::setReplication(const char * path, short replication) { |
466 | if (!nn) { |
467 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
468 | } |
469 | |
470 | if (NULL == path || !strlen(path)) { |
471 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
472 | } |
473 | |
474 | return nn->setReplication(getStandardPath(path), replication); |
475 | } |
476 | |
477 | /** |
478 | * To rename a path. |
479 | * @param src old path. |
480 | * @param dst new path. |
481 | * @return return true if success. |
482 | */ |
483 | |
484 | bool FileSystemImpl::rename(const char * src, const char * dst) { |
485 | if (!nn) { |
486 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
487 | } |
488 | |
489 | if (NULL == src || !strlen(src)) { |
490 | THROW(InvalidParameter, "Invalid input: src should not be empty" ); |
491 | } |
492 | |
493 | if (NULL == dst || !strlen(dst)) { |
494 | THROW(InvalidParameter, "Invalid input: dst should not be empty" ); |
495 | } |
496 | |
497 | return nn->rename(getStandardPath(src), getStandardPath(dst)); |
498 | } |
499 | |
500 | /** |
501 | * To set working directory. |
502 | * @param path new working directory. |
503 | */ |
504 | void FileSystemImpl::setWorkingDirectory(const char * path) { |
505 | if (NULL == path) { |
506 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
507 | } |
508 | |
509 | if (!strlen(path) || '/' != path[0]) { |
510 | THROW(InvalidParameter, |
511 | "Invalid input: path should be an absolute path" ); |
512 | } |
513 | |
514 | lock_guard<mutex> lock(mutWorkingDir); |
515 | workingDir = path; |
516 | } |
517 | |
518 | /** |
519 | * To get working directory. |
520 | * @return working directory. |
521 | */ |
522 | std::string FileSystemImpl::getWorkingDirectory() const { |
523 | return workingDir; |
524 | } |
525 | |
526 | /** |
527 | * To test if the path exist. |
528 | * @param path the path which is to be tested. |
529 | * @return return true if the path exist. |
530 | */ |
531 | |
532 | bool FileSystemImpl::exist(const char * path) { |
533 | if (!nn) { |
534 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
535 | } |
536 | |
537 | if (NULL == path || !strlen(path)) { |
538 | THROW(InvalidParameter, "Invalid input: path should not be empty" ); |
539 | } |
540 | |
541 | try { |
542 | getFileStatus(path); |
543 | } catch (const FileNotFoundException & e) { |
544 | return false; |
545 | } |
546 | |
547 | return true; |
548 | } |
549 | |
550 | /** |
551 | * To get the file system status. |
552 | * @return the file system status. |
553 | */ |
554 | FileSystemStats FileSystemImpl::getFsStats() { |
555 | if (!nn) { |
556 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
557 | } |
558 | |
559 | std::vector<int64_t> retval = nn->getFsStats(); |
560 | assert(retval.size() >= 3); |
561 | return FileSystemStats(retval[0], retval[1], retval[2]); |
562 | } |
563 | |
564 | /** |
565 | * Truncate the file in the indicated path to the indicated size. |
566 | * @param path The path to the file to be truncated |
567 | * @param size The size the file is to be truncated to |
568 | * |
569 | * @return true if and client does not need to wait for block recovery, |
570 | * false if client needs to wait for block recovery. |
571 | */ |
572 | bool FileSystemImpl::truncate(const char * path, int64_t size) { |
573 | LOG(DEBUG1, "truncate file %s to length %" PRId64, path, size); |
574 | |
575 | if (!nn) { |
576 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
577 | } |
578 | |
579 | if (NULL == path || !strlen(path)) { |
580 | THROW(InvalidParameter, "Invalid input: src should not be empty." ); |
581 | } |
582 | |
583 | std::string absPath = getStandardPath(path); |
584 | |
585 | return nn->truncate(absPath, size, clientName); |
586 | } |
587 | |
588 | std::string FileSystemImpl::getDelegationToken(const char * renewer) { |
589 | if (!nn) { |
590 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
591 | } |
592 | |
593 | if (NULL == renewer || !strlen(renewer)) { |
594 | THROW(InvalidParameter, "Invalid input: renewer should not be empty." ); |
595 | } |
596 | |
597 | Token retval = nn->getDelegationToken(renewer); |
598 | retval.setService(tokenService); |
599 | return retval.toString(); |
600 | } |
601 | |
602 | std::string FileSystemImpl::getDelegationToken() { |
603 | return getDelegationToken(key.getUser().getPrincipal().c_str()); |
604 | } |
605 | |
606 | int64_t FileSystemImpl::renewDelegationToken(const std::string & token) { |
607 | if (!nn) { |
608 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
609 | } |
610 | |
611 | Token t; |
612 | t.fromString(token); |
613 | return nn->renewDelegationToken(t); |
614 | } |
615 | |
616 | void FileSystemImpl::cancelDelegationToken(const std::string & token) { |
617 | if (!nn) { |
618 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
619 | } |
620 | |
621 | Token t; |
622 | t.fromString(token); |
623 | nn->cancelDelegationToken(t); |
624 | } |
625 | |
626 | void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset, |
627 | int64_t length, LocatedBlocks & lbs) { |
628 | if (!nn) { |
629 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
630 | } |
631 | |
632 | nn->getBlockLocations(src, offset, length, lbs); |
633 | } |
634 | |
635 | void FileSystemImpl::create(const std::string & src, const Permission & masked, |
636 | int flag, bool createParent, short replication, int64_t blockSize) { |
637 | if (!nn) { |
638 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
639 | } |
640 | |
641 | nn->create(src, masked, clientName, flag, createParent, replication, |
642 | blockSize); |
643 | } |
644 | |
645 | std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > |
646 | FileSystemImpl::append(const std::string& src) { |
647 | if (!nn) { |
648 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
649 | } |
650 | |
651 | return nn->append(src, clientName); |
652 | } |
653 | |
654 | void FileSystemImpl::abandonBlock(const ExtendedBlock & b, |
655 | const std::string & src) { |
656 | if (!nn) { |
657 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
658 | } |
659 | |
660 | nn->abandonBlock(b, src, clientName); |
661 | } |
662 | |
663 | shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src, |
664 | const ExtendedBlock * previous, |
665 | const std::vector<DatanodeInfo> & excludeNodes) { |
666 | if (!nn) { |
667 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
668 | } |
669 | |
670 | return nn->addBlock(src, clientName, previous, excludeNodes); |
671 | } |
672 | |
673 | shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode( |
674 | const std::string & src, const ExtendedBlock & blk, |
675 | const std::vector<DatanodeInfo> & existings, |
676 | const std::vector<std::string> & storageIDs, |
677 | const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) { |
678 | if (!nn) { |
679 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
680 | } |
681 | |
682 | return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes, |
683 | numAdditionalNodes, clientName); |
684 | } |
685 | |
686 | bool FileSystemImpl::complete(const std::string & src, |
687 | const ExtendedBlock * last) { |
688 | if (!nn) { |
689 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
690 | } |
691 | |
692 | return nn->complete(src, clientName, last); |
693 | } |
694 | |
695 | /*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) { |
696 | if (!nn) { |
697 | THROW(HdfsIOException, "FileSystemImpl: not connected."); |
698 | } |
699 | |
700 | nn->reportBadBlocks(blocks); |
701 | }*/ |
702 | |
703 | void FileSystemImpl::fsync(const std::string & src) { |
704 | if (!nn) { |
705 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
706 | } |
707 | |
708 | nn->fsync(src, clientName); |
709 | } |
710 | |
711 | shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline( |
712 | const ExtendedBlock & block) { |
713 | if (!nn) { |
714 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
715 | } |
716 | |
717 | return nn->updateBlockForPipeline(block, clientName); |
718 | } |
719 | |
720 | void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock, |
721 | const ExtendedBlock & newBlock, |
722 | const std::vector<DatanodeInfo> & newNodes, |
723 | const std::vector<std::string> & storageIDs) { |
724 | if (!nn) { |
725 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
726 | } |
727 | |
728 | nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); |
729 | } |
730 | |
731 | bool FileSystemImpl::getListing(const std::string & src, |
732 | const std::string & startAfter, bool needLocation, |
733 | std::vector<FileStatus> & dl) { |
734 | if (!nn) { |
735 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
736 | } |
737 | |
738 | return nn->getListing(src, startAfter, needLocation, dl); |
739 | } |
740 | |
741 | bool FileSystemImpl::renewLease() { |
742 | if (!nn) { |
743 | THROW(HdfsIOException, "FileSystemImpl: not connected." ); |
744 | } |
745 | |
746 | //protected by LeaseRenewer's lock |
747 | if (0 == openedOutputStream) { |
748 | return false; |
749 | } |
750 | |
751 | try { |
752 | nn->renewLease(clientName); |
753 | return true; |
754 | } catch (const HdfsException & e) { |
755 | std::string buffer; |
756 | LOG(LOG_ERROR, |
757 | "Failed to renew lease for filesystem which client name is %s, since:\n%s" , |
758 | getClientName(), GetExceptionDetail(e, buffer)); |
759 | } catch (const std::exception & e) { |
760 | LOG(LOG_ERROR, |
761 | "Failed to renew lease for filesystem which client name is %s, since:\n%s" , |
762 | getClientName(), e.what()); |
763 | } |
764 | |
765 | return false; |
766 | } |
767 | |
768 | void FileSystemImpl::registerOpenedOutputStream() { |
769 | //protected by LeaseRenewer's lock |
770 | ++openedOutputStream; |
771 | } |
772 | |
773 | bool FileSystemImpl::unregisterOpenedOutputStream() { |
774 | //protected by LeaseRenewer's lock |
775 | if (openedOutputStream > 0) { |
776 | --openedOutputStream; |
777 | } |
778 | |
779 | return openedOutputStream == 0; |
780 | } |
781 | |
782 | } |
783 | } |
784 | |