1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
29#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
30
31#include "platform.h"
32
33#include "BlockReader.h"
34#include "ExceptionInternal.h"
35#include "FileSystem.h"
36#include "Hash.h"
37#include "InputStreamInter.h"
38#include "Memory.h"
39#include "PeerCache.h"
40#include "rpc/RpcAuth.h"
41#include "server/Datanode.h"
42#include "server/LocatedBlock.h"
43#include "server/LocatedBlocks.h"
44#include "SessionConfig.h"
45#include "Unordered.h"
46
47#ifdef MOCK
48#include "TestDatanodeStub.h"
49#endif
50
51namespace Hdfs {
52namespace Internal {
53
54/**
55 * A input stream used read data from hdfs.
56 */
57class InputStreamImpl: public InputStreamInter {
58public:
59 InputStreamImpl();
60 ~InputStreamImpl();
61
62 /**
63 * Open a file to read
64 * @param fs hdfs file system.
65 * @param path the file to be read.
66 * @param verifyChecksum verify the checksum.
67 */
68 void open(shared_ptr<FileSystemInter> fs, const char * path, bool verifyChecksum);
69
70 /**
71 * To read data from hdfs.
72 * @param buf the buffer used to filled.
73 * @param size buffer size.
74 * @return return the number of bytes filled in the buffer, it may less than size.
75 */
76 int32_t read(char * buf, int32_t size);
77
78 /**
79 * To read data from hdfs, block until get the given size of bytes.
80 * @param buf the buffer used to filled.
81 * @param size the number of bytes to be read.
82 */
83 void readFully(char * buf, int64_t size);
84
85 int64_t available();
86
87 /**
88 * To move the file point to the given position.
89 * @param pos the given position.
90 */
91 void seek(int64_t pos);
92
93 /**
94 * To get the current file point position.
95 * @return the position of current file point.
96 */
97 int64_t tell();
98
99 /**
100 * Close the stream.
101 */
102 void close();
103
104 /**
105 * Convert to a printable string
106 *
107 * @return return a printable string
108 */
109 std::string toString();
110
111private:
112 bool choseBestNode();
113 bool isLocalNode();
114 int32_t readInternal(char * buf, int32_t size);
115 int32_t readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure);
116 int64_t getFileLength();
117 int64_t readBlockLength(const LocatedBlock & b);
118 void checkStatus();
119 void openInternal(shared_ptr<FileSystemInter> fs, const char * path,
120 bool verifyChecksum);
121 void readFullyInternal(char * buf, int64_t size);
122 void seekInternal(int64_t pos);
123 void seekToBlock(const LocatedBlock & lb);
124 void setupBlockReader(bool temporaryDisableLocalRead);
125 void updateBlockInfos();
126
127private:
128 bool closed;
129 bool localRead;
130 bool readFromUnderConstructedBlock;
131 bool verify;
132 DatanodeInfo curNode;
133 exception_ptr lastError;
134 FileStatus fileInfo;
135 int maxGetBlockInfoRetry;
136 int64_t cursor;
137 int64_t endOfCurBlock;
138 int64_t lastBlockBeingWrittenLength;
139 int64_t prefetchSize;
140 PeerCache *peerCache;
141 RpcAuth auth;
142 shared_ptr<BlockReader> blockReader;
143 shared_ptr<FileSystemInter> filesystem;
144 shared_ptr<LocatedBlock> curBlock;
145 shared_ptr<LocatedBlocks> lbs;
146 shared_ptr<SessionConfig> conf;
147 std::string path;
148 std::vector<DatanodeInfo> failedNodes;
149 std::vector<char> localReaderBuffer;
150
151#ifdef MOCK
152private:
153 Hdfs::Mock::TestDatanodeStub * stub;
154#endif
155};
156
157}
158}
159
160#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ */
161