1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "BigEndian.h"
29#include "DataTransferProtocolSender.h"
30#include "Exception.h"
31#include "ExceptionInternal.h"
32#include "HWCrc32c.h"
33#include "RemoteBlockReader.h"
34#include "SWCrc32c.h"
35#include "WriteBuffer.h"
36
37#include <inttypes.h>
38#include <vector>
39
40namespace Hdfs {
41namespace Internal {
42
43RemoteBlockReader::RemoteBlockReader(const ExtendedBlock& eb,
44 DatanodeInfo& datanode,
45 PeerCache& peerCache, int64_t start,
46 int64_t len, const Token& token,
47 const char* clientName, bool verify,
48 SessionConfig& conf)
49 : sentStatus(false),
50 verify(verify),
51 binfo(eb),
52 datanode(datanode),
53 checksumSize(0),
54 chunkSize(0),
55 position(0),
56 size(0),
57 cursor(start),
58 endOffset(len + start),
59 lastSeqNo(-1),
60 peerCache(peerCache) {
61
62 assert(start >= 0);
63 readTimeout = conf.getInputReadTimeout();
64 writeTimeout = conf.getInputWriteTimeout();
65 connTimeout = conf.getInputConnTimeout();
66 sock = getNextPeer(datanode);
67 in = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock));
68 sender = shared_ptr<DataTransferProtocol>(new DataTransferProtocolSender(
69 *sock, writeTimeout, datanode.formatAddress()));
70 sender->readBlock(eb, token, clientName, start, len);
71 checkResponse();
72}
73
74RemoteBlockReader::~RemoteBlockReader() {
75 if (sentStatus) {
76 peerCache.addConnection(sock, datanode);
77 } else {
78 sock->close();
79 }
80}
81
82shared_ptr<Socket> RemoteBlockReader::getNextPeer(const DatanodeInfo& dn) {
83 shared_ptr<Socket> sock;
84 try {
85 sock = peerCache.getConnection(dn);
86
87 if (!sock) {
88 sock = shared_ptr<Socket>(new TcpSocketImpl);
89 sock->connect(dn.getIpAddr().c_str(), dn.getXferPort(),
90 connTimeout);
91 sock->setNoDelay(true);
92 }
93 } catch (const HdfsTimeoutException & e) {
94 NESTED_THROW(HdfsIOException,
95 "RemoteBlockReader: Failed to connect to %s",
96 dn.formatAddress().c_str());
97 }
98
99 return sock;
100}
101
102void RemoteBlockReader::checkResponse() {
103 std::vector<char> respBuffer;
104 int32_t respSize = in->readVarint32(readTimeout);
105
106 if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
107 THROW(HdfsIOException, "RemoteBlockReader get a invalid response size: %d, Block: %s, from Datanode: %s",
108 respSize, binfo.toString().c_str(), datanode.formatAddress().c_str());
109 }
110
111 respBuffer.resize(respSize);
112 in->readFully(&respBuffer[0], respSize, readTimeout);
113 BlockOpResponseProto resp;
114
115 if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
116 THROW(HdfsIOException, "RemoteBlockReader cannot parse BlockOpResponseProto from Datanode response, "
117 "Block: %s, from Datanode: %s", binfo.toString().c_str(), datanode.formatAddress().c_str());
118 }
119
120 if (resp.status() != Status::DT_PROTO_SUCCESS) {
121 std::string msg;
122
123 if (resp.has_message()) {
124 msg = resp.message();
125 }
126
127 if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) {
128 THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token is invalid. Datanode: %s, Block: %s",
129 datanode.formatAddress().c_str(), binfo.toString().c_str());
130 } else {
131 THROW(HdfsIOException,
132 "RemoteBlockReader: Datanode return an error when sending read request to Datanode: %s, Block: %s, %s.",
133 datanode.formatAddress().c_str(), binfo.toString().c_str(),
134 (msg.empty() ? "check Datanode's log for more information" : msg.c_str()));
135 }
136 }
137
138 const ReadOpChecksumInfoProto & checksumInfo = resp.readopchecksuminfo();
139 const ChecksumProto & cs = checksumInfo.checksum();
140 chunkSize = cs.bytesperchecksum();
141
142 if (chunkSize < 0) {
143 THROW(HdfsIOException,
144 "RemoteBlockReader invalid chunk size: %d, expected range[0, %" PRId64 "], Block: %s, from Datanode: %s",
145 chunkSize, binfo.getNumBytes(), binfo.toString().c_str(), datanode.formatAddress().c_str());
146 }
147
148 switch (cs.type()) {
149 case ChecksumTypeProto::CHECKSUM_NULL:
150 verify = false;
151 checksumSize = 0;
152 break;
153
154 case ChecksumTypeProto::CHECKSUM_CRC32:
155 THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 checksum, Block: %s, from Datanode: %s",
156 binfo.toString().c_str(), datanode.formatAddress().c_str());
157 break;
158
159 case ChecksumTypeProto::CHECKSUM_CRC32C:
160 if (HWCrc32c::available()) {
161 checksum = shared_ptr<Checksum>(new HWCrc32c());
162 } else {
163 checksum = shared_ptr<Checksum>(new SWCrc32c());
164 }
165
166 checksumSize = sizeof(int32_t);
167 break;
168
169 default:
170 THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum type: %d, Block: %s, from Datanode: %s",
171 static_cast<int>(cs.type()), binfo.toString().c_str(), datanode.formatAddress().c_str());
172 }
173
174 /*
175 * The offset into the block at which the first packet
176 * will start. This is necessary since reads will align
177 * backwards to a checksum chunk boundary.
178 */
179 int64_t firstChunkOffset = checksumInfo.chunkoffset();
180
181 if (firstChunkOffset < 0 || firstChunkOffset > cursor || firstChunkOffset <= cursor - chunkSize) {
182 THROW(HdfsIOException,
183 "RemoteBlockReader invalid first chunk offset: %" PRId64 ", expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s",
184 firstChunkOffset, cursor, binfo.toString().c_str(), datanode.formatAddress().c_str());
185 }
186}
187
188shared_ptr<PacketHeader> RemoteBlockReader::readPacketHeader() {
189 try {
190 shared_ptr<PacketHeader> retval;
191 static const int packetHeaderLen = PacketHeader::GetPkgHeaderSize();
192 std::vector<char> buf(packetHeaderLen);
193
194 if (lastHeader && lastHeader->isLastPacketInBlock()) {
195 THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s.",
196 datanode.formatAddress().c_str(), binfo.toString().c_str());
197 }
198
199 in->readFully(&buf[0], packetHeaderLen, readTimeout);
200 retval = shared_ptr<PacketHeader>(new PacketHeader);
201 retval->readFields(&buf[0], packetHeaderLen);
202 return retval;
203 } catch (const HdfsIOException & e) {
204 NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read block header for Block: %s from Datanode: %s.",
205 binfo.toString().c_str(), datanode.formatAddress().c_str());
206 }
207}
208
209void RemoteBlockReader::readNextPacket() {
210 assert(position >= size);
211 lastHeader = readPacketHeader();
212 int dataSize = lastHeader->getDataLen();
213 int64_t pendingAhead = 0;
214
215 if (!lastHeader->sanityCheck(lastSeqNo)) {
216 THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity check for block %s from Datanode %s.",
217 binfo.toString().c_str(), datanode.formatAddress().c_str());
218 }
219
220 assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t));
221
222 if (dataSize > 0) {
223 int chunks = (dataSize + chunkSize - 1) / chunkSize;
224 int checksumLen = chunks * checksumSize;
225 size = checksumLen + dataSize;
226 assert(size == lastHeader->getPacketLen() - static_cast<int>(sizeof(int32_t)));
227 buffer.resize(size);
228 in->readFully(&buffer[0], size, readTimeout);
229 lastSeqNo = lastHeader->getSeqno();
230
231 if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) + dataSize + checksumLen) {
232 THROW(HdfsIOException, "Invalid Packet, packetLen is %d, dataSize is %d, checksum size is %d",
233 lastHeader->getPacketLen(), dataSize, checksumLen);
234 }
235
236 if (verify) {
237 verifyChecksum(chunks);
238 }
239
240 /*
241 * skip checksum
242 */
243 position = checksumLen;
244 /*
245 * the first packet we get may start at the position before we required
246 */
247 pendingAhead = cursor - lastHeader->getOffsetInBlock();
248 pendingAhead = pendingAhead > 0 ? pendingAhead : 0;
249 position += pendingAhead;
250 }
251
252 /*
253 * we reach the end of the range we required, send status to datanode
254 * if datanode do not sending data anymore.
255 */
256
257 if (cursor + dataSize - pendingAhead >= endOffset && readTrailingEmptyPacket()) {
258 sendStatus();
259 }
260}
261
262bool RemoteBlockReader::readTrailingEmptyPacket() {
263 shared_ptr<PacketHeader> trailingHeader = readPacketHeader();
264
265 if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() != 0) {
266 return false;
267 }
268
269 return true;
270}
271
272void RemoteBlockReader::sendStatus() {
273 ClientReadStatusProto status;
274
275 if (verify) {
276 status.set_status(Status::DT_PROTO_CHECKSUM_OK);
277 } else {
278 status.set_status(Status::DT_PROTO_SUCCESS);
279 }
280
281 WriteBuffer buffer;
282 int size = status.ByteSize();
283 buffer.writeVarint32(size);
284 status.SerializeToArray(buffer.alloc(size), size);
285 sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout);
286 sentStatus = true;
287}
288
289void RemoteBlockReader::verifyChecksum(int chunks) {
290 int dataSize = lastHeader->getDataLen();
291 char * pchecksum = &buffer[0];
292 char * pdata = &buffer[0] + (chunks * checksumSize);
293
294 for (int i = 0; i < chunks; ++i) {
295 int size = chunkSize < dataSize ? chunkSize : dataSize;
296 dataSize -= size;
297 checksum->reset();
298 checksum->update(pdata + (i * chunkSize), size);
299 uint32_t result = checksum->getValue();
300 uint32_t target = ReadBigEndian32FromArray(pchecksum + (i * checksumSize));
301
302 if (result != target && size == chunkSize) {
303 THROW(ChecksumException, "RemoteBlockReader: checksum not match for Block: %s, on Datanode: %s",
304 binfo.toString().c_str(), datanode.formatAddress().c_str());
305 }
306 }
307
308 assert(0 == dataSize);
309}
310
311int64_t RemoteBlockReader::available() {
312 return size - position > 0 ? size - position : 0;
313}
314
315int32_t RemoteBlockReader::read(char * buf, int32_t len) {
316 assert(0 != len && NULL != buf);
317
318 if (cursor >= endOffset) {
319 THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s.",
320 datanode.formatAddress().c_str(), binfo.toString().c_str());
321 }
322
323 try {
324 if (position >= size) {
325 readNextPacket();
326 }
327
328 int32_t todo = len < size - position ? len : size - position;
329 memcpy(buf, &buffer[position], todo);
330 position += todo;
331 cursor += todo;
332 return todo;
333 } catch (const HdfsTimeoutException & e) {
334 NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.",
335 binfo.toString().c_str(), datanode.formatAddress().c_str());
336 } catch (const HdfsNetworkException & e) {
337 NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.",
338 binfo.toString().c_str(), datanode.formatAddress().c_str());
339 }
340}
341
342void RemoteBlockReader::skip(int64_t len) {
343 int64_t todo = len;
344 assert(cursor + len <= endOffset);
345
346 try {
347 while (todo > 0) {
348 if (cursor >= endOffset) {
349 THROW(HdfsIOException, "RemoteBlockReader: skip over block end from Datanode: %s, Block: %s.",
350 datanode.formatAddress().c_str(), binfo.toString().c_str());
351 }
352
353 if (position >= size) {
354 readNextPacket();
355 }
356
357 int batch = size - position;
358 batch = batch < todo ? batch : static_cast<int>(todo);
359 position += batch;
360 cursor += batch;
361 todo -= batch;
362 }
363 } catch (const HdfsTimeoutException & e) {
364 NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.",
365 binfo.toString().c_str(), datanode.formatAddress().c_str());
366 } catch (const HdfsNetworkException & e) {
367 NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.",
368 binfo.toString().c_str(), datanode.formatAddress().c_str());
369 }
370}
371
372}
373}
374