1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Exception.h"
29#include "ExceptionInternal.h"
30#include "FileSystemInter.h"
31#include "InputStreamImpl.h"
32#include "InputStreamInter.h"
33#include "LocalBlockReader.h"
34#include "Logger.h"
35#include "RemoteBlockReader.h"
36#include "server/Datanode.h"
37#include "Thread.h"
38
39#include <algorithm>
40#include <ifaddrs.h>
41#include <inttypes.h>
42#include <iostream>
43#include <sys/socket.h>
44#include <sys/types.h>
45#include <unistd.h>
46
47namespace Hdfs {
48namespace Internal {
49
50unordered_set<std::string> BuildLocalAddrSet() {
51 unordered_set<std::string> set;
52 struct ifaddrs * ifAddr = NULL;
53 struct ifaddrs * pifAddr = NULL;
54 struct sockaddr * addr;
55
56 if (getifaddrs(&ifAddr)) {
57 THROW(HdfsNetworkException,
58 "InputStreamImpl: cannot get local network interface: %s",
59 GetSystemErrorInfo(errno));
60 }
61
62 try {
63 std::vector<char> host;
64 const char * pHost;
65 host.resize(INET6_ADDRSTRLEN + 1);
66
67 for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) {
68 addr = pifAddr->ifa_addr;
69
70 if (!addr) {
71 continue;
72 }
73
74 memset(&host[0], 0, INET6_ADDRSTRLEN + 1);
75
76 if (addr->sa_family == AF_INET) {
77 pHost =
78 inet_ntop(addr->sa_family,
79 &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr,
80 &host[0], INET6_ADDRSTRLEN);
81 } else if (addr->sa_family == AF_INET6) {
82 pHost =
83 inet_ntop(addr->sa_family,
84 &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr,
85 &host[0], INET6_ADDRSTRLEN);
86 } else {
87 continue;
88 }
89
90 if (NULL == pHost) {
91 THROW(HdfsNetworkException,
92 "InputStreamImpl: cannot get convert network address to textual form: %s",
93 GetSystemErrorInfo(errno));
94 }
95
96 set.insert(pHost);
97 }
98
99 /*
100 * add hostname.
101 */
102 long hostlen = sysconf(_SC_HOST_NAME_MAX);
103 host.resize(hostlen + 1);
104
105 if (gethostname(&host[0], host.size())) {
106 THROW(HdfsNetworkException,
107 "InputStreamImpl: cannot get hostname: %s",
108 GetSystemErrorInfo(errno));
109 }
110
111 set.insert(&host[0]);
112 } catch (...) {
113 if (ifAddr != NULL) {
114 freeifaddrs(ifAddr);
115 }
116
117 throw;
118 }
119
120 if (ifAddr != NULL) {
121 freeifaddrs(ifAddr);
122 }
123
124 return set;
125}
126
127InputStreamImpl::InputStreamImpl() :
128 closed(true), localRead(true), readFromUnderConstructedBlock(false), verify(
129 true), maxGetBlockInfoRetry(3), cursor(0), endOfCurBlock(0), lastBlockBeingWrittenLength(
130 0), prefetchSize(0), peerCache(NULL) {
131#ifdef MOCK
132 stub = NULL;
133#endif
134}
135
136InputStreamImpl::~InputStreamImpl() {
137}
138
139void InputStreamImpl::checkStatus() {
140 if (closed) {
141 THROW(HdfsIOException, "InputStreamImpl: stream is not opened.");
142 }
143
144 if (lastError != exception_ptr()) {
145 rethrow_exception(lastError);
146 }
147}
148
149
150int64_t InputStreamImpl::readBlockLength(const LocatedBlock & b) {
151 const std::vector<DatanodeInfo> & nodes = b.getLocations();
152 int replicaNotFoundCount = nodes.size();
153
154 for (size_t i = 0; i < nodes.size(); ++i) {
155 try {
156 int64_t n = 0;
157 shared_ptr<Datanode> dn;
158 RpcAuth a = auth;
159 a.getUser().addToken(b.getToken());
160#ifdef MOCK
161
162 if (stub) {
163 dn = stub->getDatanode();
164 } else {
165 dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(),
166 nodes[i].getIpcPort(), *conf, a));
167 }
168
169#else
170 dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(),
171 nodes[i].getIpcPort(), *conf, a));
172#endif
173 n = dn->getReplicaVisibleLength(b);
174
175 if (n >= 0) {
176 return n;
177 }
178 } catch (const ReplicaNotFoundException & e) {
179 std::string buffer;
180 LOG(LOG_ERROR,
181 "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s",
182 b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer));
183 LOG(INFO,
184 "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode",
185 b.toString().c_str(), path.c_str());
186 --replicaNotFoundCount;
187 } catch (const HdfsIOException & e) {
188 std::string buffer;
189 LOG(LOG_ERROR,
190 "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s",
191 b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer));
192 LOG(INFO,
193 "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode",
194 b.toString().c_str(), path.c_str());
195 }
196 }
197
198 // Namenode told us about these locations, but none know about the replica
199 // means that we hit the race between pipeline creation start and end.
200 // we require all 3 because some other exception could have happened
201 // on a DN that has it. we want to report that error
202 if (replicaNotFoundCount == 0) {
203 return 0;
204 }
205
206 return -1;
207}
208
209/**
210 * Getting blocks locations'information from namenode
211 */
212void InputStreamImpl::updateBlockInfos() {
213 int retry = maxGetBlockInfoRetry;
214
215 for (int i = 0; i < retry; ++i) {
216 try {
217 if (!lbs) {
218 lbs = shared_ptr < LocatedBlocksImpl > (new LocatedBlocksImpl);
219 }
220
221 filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs);
222
223 if (lbs->isLastBlockComplete()) {
224 lastBlockBeingWrittenLength = 0;
225 } else {
226 shared_ptr<LocatedBlock> last = lbs->getLastBlock();
227
228 if (!last) {
229 lastBlockBeingWrittenLength = 0;
230 } else {
231 lastBlockBeingWrittenLength = readBlockLength(*last);
232
233 if (lastBlockBeingWrittenLength == -1) {
234 if (i + 1 >= retry) {
235 THROW(HdfsIOException,
236 "InputStreamImpl: failed to get block visible length for Block: %s from all Datanode.",
237 last->toString().c_str());
238 } else {
239 LOG(LOG_ERROR,
240 "InputStreamImpl: failed to get block visible length for Block: %s file %s from all Datanode.",
241 last->toString().c_str(), path.c_str());
242
243 try {
244 sleep_for(milliseconds(4000));
245 } catch (...) {
246 }
247
248 continue;
249 }
250 }
251
252 last->setNumBytes(lastBlockBeingWrittenLength);
253 }
254 }
255
256 return;
257 } catch (const HdfsRpcException & e) {
258 std::string buffer;
259 LOG(LOG_ERROR,
260 "InputStreamImpl: failed to get block information for file %s, %s",
261 path.c_str(), GetExceptionDetail(e, buffer));
262
263 if (i + 1 >= retry) {
264 throw;
265 }
266 }
267
268 LOG(INFO,
269 "InputStreamImpl: retry to get block information for file: %s, already tried %d time(s).",
270 path.c_str(), i + 1);
271 }
272}
273
274int64_t InputStreamImpl::getFileLength() {
275 int64_t length = lbs->getFileLength();
276
277 if (!lbs->isLastBlockComplete()) {
278 length += lastBlockBeingWrittenLength;
279 }
280
281 return length;
282}
283
284void InputStreamImpl::seekToBlock(const LocatedBlock & lb) {
285 if (cursor >= lbs->getFileLength()) {
286 assert(!lbs->isLastBlockComplete());
287 readFromUnderConstructedBlock = true;
288 } else {
289 readFromUnderConstructedBlock = false;
290 }
291
292 assert(cursor >= lb.getOffset()
293 && cursor < lb.getOffset() + lb.getNumBytes());
294 curBlock = shared_ptr < LocatedBlock > (new LocatedBlock(lb));
295 int64_t blockSize = curBlock->getNumBytes();
296 assert(blockSize > 0);
297 endOfCurBlock = blockSize + curBlock->getOffset();
298 failedNodes.clear();
299 blockReader.reset();
300}
301
302bool InputStreamImpl::choseBestNode() {
303 const std::vector<DatanodeInfo> & nodes = curBlock->getLocations();
304
305 for (size_t i = 0; i < nodes.size(); ++i) {
306 if (std::binary_search(failedNodes.begin(), failedNodes.end(),
307 nodes[i])) {
308 continue;
309 }
310
311 curNode = nodes[i];
312 return true;
313 }
314
315 return false;
316}
317
318bool InputStreamImpl::isLocalNode() {
319 static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet();
320 bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end();
321 return retval;
322}
323
324void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) {
325 bool lastReadFromLocal = false;
326 exception_ptr lastException;
327
328 while (true) {
329 if (!choseBestNode()) {
330 try {
331 if (lastException) {
332 rethrow_exception(lastException);
333 }
334 } catch (...) {
335 NESTED_THROW(HdfsIOException,
336 "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.",
337 curBlock->toString().c_str());
338 }
339
340 THROW(HdfsIOException,
341 "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.",
342 curBlock->toString().c_str());
343 }
344
345 try {
346 int64_t offset, len;
347 offset = cursor - curBlock->getOffset();
348 assert(offset >= 0);
349 len = curBlock->getNumBytes() - offset;
350 assert(len > 0);
351
352 if (!temporaryDisableLocalRead && !lastReadFromLocal &&
353 !readFromUnderConstructedBlock && localRead && isLocalNode()) {
354 lastReadFromLocal = true;
355
356 shared_ptr<ReadShortCircuitInfo> info;
357 ReadShortCircuitInfoBuilder builder(curNode, auth, *conf);
358
359 try {
360 info = builder.fetchOrCreate(*curBlock, curBlock->getToken());
361
362 if (!info) {
363 continue;
364 }
365
366 assert(info->isValid());
367 blockReader = shared_ptr<BlockReader>(
368 new LocalBlockReader(info, *curBlock, offset, verify,
369 *conf, localReaderBuffer));
370 } catch (...) {
371 if (info) {
372 info->setValid(false);
373 }
374
375 throw;
376 }
377 } else {
378 const char * clientName = filesystem->getClientName();
379 lastReadFromLocal = false;
380 blockReader = shared_ptr<BlockReader>(new RemoteBlockReader(
381 *curBlock, curNode, *peerCache, offset, len,
382 curBlock->getToken(), clientName, verify, *conf));
383 }
384
385 break;
386 } catch (const HdfsIOException & e) {
387 lastException = current_exception();
388 std::string buffer;
389
390 if (lastReadFromLocal) {
391 LOG(LOG_ERROR,
392 "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\n"
393 "retry the same node but disable read shortcircuit feature",
394 curBlock->toString().c_str(), path.c_str(),
395 curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
396 /*
397 * do not add node into failedNodes since we will retry the same node but
398 * disable local block reading
399 */
400 } else {
401 LOG(LOG_ERROR,
402 "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\nretry another node",
403 curBlock->toString().c_str(), path.c_str(),
404 curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
405 failedNodes.push_back(curNode);
406 std::sort(failedNodes.begin(), failedNodes.end());
407 }
408 }
409 }
410}
411
412void InputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path,
413 bool verifyChecksum) {
414 if (NULL == path || 0 == strlen(path)) {
415 THROW(InvalidParameter, "path is invalid.");
416 }
417
418 try {
419 openInternal(fs, path, verifyChecksum);
420 } catch (...) {
421 close();
422 throw;
423 }
424}
425
426void InputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path,
427 bool verifyChecksum) {
428 try {
429 filesystem = fs;
430 verify = verifyChecksum;
431 this->path = fs->getStandardPath(path);
432 LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this, this->path.c_str(), (verifyChecksum ? "true" : "false"));
433 conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf()));
434 this->auth = RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getRpcAuthMethod()));
435 prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize();
436 localRead = conf->isReadFromLocal();
437 maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry();
438 peerCache = &fs->getPeerCache();
439 updateBlockInfos();
440 closed = false;
441 } catch (const HdfsCanceled & e) {
442 throw;
443 } catch (const FileNotFoundException & e) {
444 throw;
445 } catch (const HdfsException & e) {
446 NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.",
447 this->path.c_str());
448 }
449}
450
451int32_t InputStreamImpl::read(char * buf, int32_t size) {
452 checkStatus();
453
454 try {
455 int64_t prvious = cursor;
456 int32_t done = readInternal(buf, size);
457 LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64 " done %d, next pos %" PRId64, this, path.c_str(), size,
458 prvious, done, cursor);
459 return done;
460 } catch (const HdfsEndOfStream & e) {
461 throw;
462 } catch (...) {
463 lastError = current_exception();
464 throw;
465 }
466}
467
468int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure) {
469 bool temporaryDisableLocalRead = false;
470 std::string buffer;
471
472 while (true) {
473 try {
474 /*
475 * Setup block reader here and handle failure.
476 */
477 if (!blockReader) {
478 setupBlockReader(temporaryDisableLocalRead);
479 temporaryDisableLocalRead = false;
480 }
481 } catch (const HdfsInvalidBlockToken & e) {
482 std::string buffer;
483 LOG(LOG_ERROR,
484 "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.",
485 curBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
486 return -1;
487 } catch (const HdfsIOException & e) {
488 /*
489 * In setupBlockReader, we have tried all the replicas.
490 * We now update block informations once, and try again.
491 */
492 if (shouldUpdateMetadataOnFailure) {
493 LOG(LOG_ERROR,
494 "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.",
495 curBlock->toString().c_str(), path.c_str(),
496 GetExceptionDetail(e, buffer));
497 return -1;
498 } else {
499 /*
500 * We have updated block informations and failed again.
501 */
502 throw;
503 }
504 }
505
506 /*
507 * Block reader has been setup, read from block reader.
508 */
509 try {
510 int32_t todo = size;
511 todo = todo < endOfCurBlock - cursor ?
512 todo : static_cast<int32_t>(endOfCurBlock - cursor);
513 assert(blockReader);
514 todo = blockReader->read(buf, todo);
515 cursor += todo;
516 /*
517 * Exit the loop and function from here if success.
518 */
519 return todo;
520 } catch (const HdfsIOException & e) {
521 /*
522 * Failed to read from current block reader,
523 * add the current datanode to invalid node list and try again.
524 */
525 LOG(LOG_ERROR,
526 "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, "
527 "retry read again from another Datanode.",
528 curBlock->toString().c_str(), path.c_str(),
529 curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
530
531 if (conf->doesNotRetryAnotherNode()) {
532 throw;
533 }
534 } catch (const ChecksumException & e) {
535 LOG(LOG_ERROR,
536 "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, "
537 "retry read again from another Datanode.",
538 curBlock->toString().c_str(), path.c_str(),
539 curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
540 }
541
542 /*
543 * Successfully create the block reader but failed to read.
544 * Disable the local block reader and try the same node again.
545 */
546 if (!blockReader || dynamic_cast<LocalBlockReader *>(blockReader.get())) {
547 temporaryDisableLocalRead = true;
548 } else {
549 /*
550 * Remote block reader failed to read, try another node.
551 */
552 LOG(INFO, "IntputStreamImpl: Add invalid datanode %s to failed datanodes and try another datanode again for file %s.",
553 curNode.formatAddress().c_str(), path.c_str());
554 failedNodes.push_back(curNode);
555 std::sort(failedNodes.begin(), failedNodes.end());
556 }
557
558 blockReader.reset();
559 }
560}
561
562/**
563 * To read data from hdfs.
564 * @param buf the buffer used to filled.
565 * @param size buffer size.
566 * @return return the number of bytes filled in the buffer, it may less than size.
567 */
568int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
569 int updateMetadataOnFailure = conf->getMaxReadBlockRetry();
570
571 try {
572 do {
573 const LocatedBlock * lb = NULL;
574
575 /*
576 * Check if we have got the block information we need.
577 */
578 if (!lbs || cursor >= getFileLength()
579 || (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) {
580 /*
581 * Get block information from namenode.
582 * Do RPC failover work in updateBlockInfos.
583 */
584 updateBlockInfos();
585
586 /*
587 * We already have the up-to-date block information,
588 * Check if we reach the end of file.
589 */
590 if (cursor >= getFileLength()) {
591 THROW(HdfsEndOfStream,
592 "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %d, from file: %s",
593 cursor, size, path.c_str());
594 }
595 }
596
597 /*
598 * If we reach the end of block or the block information has just updated,
599 * seek to the right block to read.
600 */
601 if (cursor >= endOfCurBlock) {
602 lb = lbs->findBlock(cursor);
603
604 if (!lb) {
605 THROW(HdfsIOException,
606 "InputStreamImpl: cannot find block information at position: %" PRId64 " for file: %s",
607 cursor, path.c_str());
608 }
609
610 /*
611 * Seek to the right block, setup all needed variable,
612 * but do not setup block reader, setup it latter.
613 */
614 seekToBlock(*lb);
615 }
616
617 int32_t retval = readOneBlock(buf, size, updateMetadataOnFailure > 0);
618
619 /*
620 * Now we have tried all replicas and failed.
621 * We will update metadata once and try again.
622 */
623 if (retval < 0) {
624 lbs.reset();
625 endOfCurBlock = 0;
626 --updateMetadataOnFailure;
627
628 try {
629 sleep_for(seconds(1));
630 } catch (...) {
631 }
632
633 continue;
634 }
635
636 return retval;
637 } while (true);
638 } catch (const HdfsCanceled & e) {
639 throw;
640 } catch (const HdfsEndOfStream & e) {
641 throw;
642 } catch (const HdfsException & e) {
643 /*
644 * wrap the underlying error and rethrow.
645 */
646 NESTED_THROW(HdfsIOException,
647 "InputStreamImpl: cannot read file: %s, from position %" PRId64 ", size: %d.",
648 path.c_str(), cursor, size);
649 }
650}
651
652/**
653 * To read data from hdfs, block until get the given size of bytes.
654 * @param buf the buffer used to filled.
655 * @param size the number of bytes to be read.
656 */
657void InputStreamImpl::readFully(char * buf, int64_t size) {
658 LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64, path.c_str(), size, cursor);
659 checkStatus();
660
661 try {
662 return readFullyInternal(buf, size);
663 } catch (const HdfsEndOfStream & e) {
664 throw;
665 } catch (...) {
666 lastError = current_exception();
667 throw;
668 }
669}
670
671void InputStreamImpl::readFullyInternal(char * buf, int64_t size) {
672 int32_t done;
673 int64_t pos = cursor, todo = size;
674
675 try {
676 while (todo > 0) {
677 done = todo < std::numeric_limits<int32_t>::max() ?
678 static_cast<int32_t>(todo) :
679 std::numeric_limits<int32_t>::max();
680 done = readInternal(buf + (size - todo), done);
681 todo -= done;
682 }
683 } catch (const HdfsCanceled & e) {
684 throw;
685 } catch (const HdfsEndOfStream & e) {
686 THROW(HdfsEndOfStream,
687 "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %" PRId64 ", from file: %s",
688 pos, size, path.c_str());
689 } catch (const HdfsException & e) {
690 NESTED_THROW(HdfsIOException,
691 "InputStreamImpl: cannot read fully from file: %s, from position %" PRId64 ", size: %" PRId64 ".",
692 path.c_str(), pos, size);
693 }
694}
695
696int64_t InputStreamImpl::available() {
697 checkStatus();
698
699 try {
700 if (blockReader) {
701 return blockReader->available();
702 }
703 } catch (...) {
704 lastError = current_exception();
705 throw;
706 }
707
708 return 0;
709}
710
711/**
712 * To move the file point to the given position.
713 * @param size the given position.
714 */
715void InputStreamImpl::seek(int64_t pos) {
716 LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this, path.c_str(), pos, cursor);
717 checkStatus();
718
719 try {
720 seekInternal(pos);
721 } catch (...) {
722 lastError = current_exception();
723 throw;
724 }
725}
726
727void InputStreamImpl::seekInternal(int64_t pos) {
728 if (cursor == pos) {
729 return;
730 }
731
732 if (!lbs || pos > getFileLength()) {
733 updateBlockInfos();
734
735 if (pos > getFileLength()) {
736 THROW(HdfsEndOfStream,
737 "InputStreamImpl: seek over EOF, current position: %" PRId64 ", seek target: %" PRId64 ", in file: %s",
738 cursor, pos, path.c_str());
739 }
740 }
741
742 try {
743 if (blockReader && pos > cursor && pos < endOfCurBlock) {
744 blockReader->skip(pos - cursor);
745 cursor = pos;
746 return;
747 }
748 } catch (const HdfsIOException & e) {
749 std::string buffer;
750 LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s",
751 pos - cursor, path.c_str(), GetExceptionDetail(e, buffer));
752 LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str());
753 } catch (const ChecksumException & e) {
754 std::string buffer;
755 LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s",
756 pos - cursor, path.c_str(), GetExceptionDetail(e, buffer));
757 LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str());
758 }
759
760 /**
761 * the seek target exceed the current block or skip failed in current block reader.
762 * reset current block reader and set the cursor to the target position to seek.
763 */
764 endOfCurBlock = 0;
765 blockReader.reset();
766 cursor = pos;
767}
768
769/**
770 * To get the current file point position.
771 * @return the position of current file point.
772 */
773int64_t InputStreamImpl::tell() {
774 checkStatus();
775 LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor);
776 return cursor;
777}
778
779/**
780 * Close the stream.
781 */
782void InputStreamImpl::close() {
783 LOG(DEBUG2, "%p close file %s for read", this, path.c_str());
784 closed = true;
785 localRead = true;
786 readFromUnderConstructedBlock = false;
787 verify = true;
788 filesystem.reset();
789 cursor = 0;
790 endOfCurBlock = 0;
791 lastBlockBeingWrittenLength = 0;
792 prefetchSize = 0;
793 blockReader.reset();
794 curBlock.reset();
795 lbs.reset();
796 conf.reset();
797 failedNodes.clear();
798 path.clear();
799 localReaderBuffer.resize(0);
800 lastError = exception_ptr();
801}
802
803std::string InputStreamImpl::toString() {
804 if (path.empty()) {
805 return std::string("InputStream for path ") + path;
806 } else {
807 return std::string("InputStream (not opened)");
808 }
809}
810
811}
812}
813