1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "BigEndian.h" |
29 | #include "DataTransferProtocolSender.h" |
30 | #include "Exception.h" |
31 | #include "ExceptionInternal.h" |
32 | #include "HWCrc32c.h" |
33 | #include "RemoteBlockReader.h" |
34 | #include "SWCrc32c.h" |
35 | #include "WriteBuffer.h" |
36 | |
37 | #include <inttypes.h> |
38 | #include <vector> |
39 | |
40 | namespace Hdfs { |
41 | namespace Internal { |
42 | |
43 | RemoteBlockReader::RemoteBlockReader(const ExtendedBlock& eb, |
44 | DatanodeInfo& datanode, |
45 | PeerCache& peerCache, int64_t start, |
46 | int64_t len, const Token& token, |
47 | const char* clientName, bool verify, |
48 | SessionConfig& conf) |
49 | : sentStatus(false), |
50 | verify(verify), |
51 | binfo(eb), |
52 | datanode(datanode), |
53 | checksumSize(0), |
54 | chunkSize(0), |
55 | position(0), |
56 | size(0), |
57 | cursor(start), |
58 | endOffset(len + start), |
59 | lastSeqNo(-1), |
60 | peerCache(peerCache) { |
61 | |
62 | assert(start >= 0); |
63 | readTimeout = conf.getInputReadTimeout(); |
64 | writeTimeout = conf.getInputWriteTimeout(); |
65 | connTimeout = conf.getInputConnTimeout(); |
66 | sock = getNextPeer(datanode); |
67 | in = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock)); |
68 | sender = shared_ptr<DataTransferProtocol>(new DataTransferProtocolSender( |
69 | *sock, writeTimeout, datanode.formatAddress())); |
70 | sender->readBlock(eb, token, clientName, start, len); |
71 | checkResponse(); |
72 | } |
73 | |
74 | RemoteBlockReader::~RemoteBlockReader() { |
75 | if (sentStatus) { |
76 | peerCache.addConnection(sock, datanode); |
77 | } else { |
78 | sock->close(); |
79 | } |
80 | } |
81 | |
82 | shared_ptr<Socket> RemoteBlockReader::getNextPeer(const DatanodeInfo& dn) { |
83 | shared_ptr<Socket> sock; |
84 | try { |
85 | sock = peerCache.getConnection(dn); |
86 | |
87 | if (!sock) { |
88 | sock = shared_ptr<Socket>(new TcpSocketImpl); |
89 | sock->connect(dn.getIpAddr().c_str(), dn.getXferPort(), |
90 | connTimeout); |
91 | sock->setNoDelay(true); |
92 | } |
93 | } catch (const HdfsTimeoutException & e) { |
94 | NESTED_THROW(HdfsIOException, |
95 | "RemoteBlockReader: Failed to connect to %s" , |
96 | dn.formatAddress().c_str()); |
97 | } |
98 | |
99 | return sock; |
100 | } |
101 | |
102 | void RemoteBlockReader::checkResponse() { |
103 | std::vector<char> respBuffer; |
104 | int32_t respSize = in->readVarint32(readTimeout); |
105 | |
106 | if (respSize <= 0 || respSize > 10 * 1024 * 1024) { |
107 | THROW(HdfsIOException, "RemoteBlockReader get a invalid response size: %d, Block: %s, from Datanode: %s" , |
108 | respSize, binfo.toString().c_str(), datanode.formatAddress().c_str()); |
109 | } |
110 | |
111 | respBuffer.resize(respSize); |
112 | in->readFully(&respBuffer[0], respSize, readTimeout); |
113 | BlockOpResponseProto resp; |
114 | |
115 | if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) { |
116 | THROW(HdfsIOException, "RemoteBlockReader cannot parse BlockOpResponseProto from Datanode response, " |
117 | "Block: %s, from Datanode: %s" , binfo.toString().c_str(), datanode.formatAddress().c_str()); |
118 | } |
119 | |
120 | if (resp.status() != Status::DT_PROTO_SUCCESS) { |
121 | std::string msg; |
122 | |
123 | if (resp.has_message()) { |
124 | msg = resp.message(); |
125 | } |
126 | |
127 | if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) { |
128 | THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token is invalid. Datanode: %s, Block: %s" , |
129 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
130 | } else { |
131 | THROW(HdfsIOException, |
132 | "RemoteBlockReader: Datanode return an error when sending read request to Datanode: %s, Block: %s, %s." , |
133 | datanode.formatAddress().c_str(), binfo.toString().c_str(), |
134 | (msg.empty() ? "check Datanode's log for more information" : msg.c_str())); |
135 | } |
136 | } |
137 | |
138 | const ReadOpChecksumInfoProto & checksumInfo = resp.readopchecksuminfo(); |
139 | const ChecksumProto & cs = checksumInfo.checksum(); |
140 | chunkSize = cs.bytesperchecksum(); |
141 | |
142 | if (chunkSize < 0) { |
143 | THROW(HdfsIOException, |
144 | "RemoteBlockReader invalid chunk size: %d, expected range[0, %" PRId64 "], Block: %s, from Datanode: %s" , |
145 | chunkSize, binfo.getNumBytes(), binfo.toString().c_str(), datanode.formatAddress().c_str()); |
146 | } |
147 | |
148 | switch (cs.type()) { |
149 | case ChecksumTypeProto::CHECKSUM_NULL: |
150 | verify = false; |
151 | checksumSize = 0; |
152 | break; |
153 | |
154 | case ChecksumTypeProto::CHECKSUM_CRC32: |
155 | THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 checksum, Block: %s, from Datanode: %s" , |
156 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
157 | break; |
158 | |
159 | case ChecksumTypeProto::CHECKSUM_CRC32C: |
160 | if (HWCrc32c::available()) { |
161 | checksum = shared_ptr<Checksum>(new HWCrc32c()); |
162 | } else { |
163 | checksum = shared_ptr<Checksum>(new SWCrc32c()); |
164 | } |
165 | |
166 | checksumSize = sizeof(int32_t); |
167 | break; |
168 | |
169 | default: |
170 | THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum type: %d, Block: %s, from Datanode: %s" , |
171 | static_cast<int>(cs.type()), binfo.toString().c_str(), datanode.formatAddress().c_str()); |
172 | } |
173 | |
174 | /* |
175 | * The offset into the block at which the first packet |
176 | * will start. This is necessary since reads will align |
177 | * backwards to a checksum chunk boundary. |
178 | */ |
179 | int64_t firstChunkOffset = checksumInfo.chunkoffset(); |
180 | |
181 | if (firstChunkOffset < 0 || firstChunkOffset > cursor || firstChunkOffset <= cursor - chunkSize) { |
182 | THROW(HdfsIOException, |
183 | "RemoteBlockReader invalid first chunk offset: %" PRId64 ", expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s" , |
184 | firstChunkOffset, cursor, binfo.toString().c_str(), datanode.formatAddress().c_str()); |
185 | } |
186 | } |
187 | |
188 | shared_ptr<PacketHeader> RemoteBlockReader::() { |
189 | try { |
190 | shared_ptr<PacketHeader> retval; |
191 | static const int = PacketHeader::GetPkgHeaderSize(); |
192 | std::vector<char> buf(packetHeaderLen); |
193 | |
194 | if (lastHeader && lastHeader->isLastPacketInBlock()) { |
195 | THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s." , |
196 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
197 | } |
198 | |
199 | in->readFully(&buf[0], packetHeaderLen, readTimeout); |
200 | retval = shared_ptr<PacketHeader>(new PacketHeader); |
201 | retval->readFields(&buf[0], packetHeaderLen); |
202 | return retval; |
203 | } catch (const HdfsIOException & e) { |
204 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read block header for Block: %s from Datanode: %s." , |
205 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
206 | } |
207 | } |
208 | |
209 | void RemoteBlockReader::readNextPacket() { |
210 | assert(position >= size); |
211 | lastHeader = readPacketHeader(); |
212 | int dataSize = lastHeader->getDataLen(); |
213 | int64_t pendingAhead = 0; |
214 | |
215 | if (!lastHeader->sanityCheck(lastSeqNo)) { |
216 | THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity check for block %s from Datanode %s." , |
217 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
218 | } |
219 | |
220 | assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t)); |
221 | |
222 | if (dataSize > 0) { |
223 | int chunks = (dataSize + chunkSize - 1) / chunkSize; |
224 | int checksumLen = chunks * checksumSize; |
225 | size = checksumLen + dataSize; |
226 | assert(size == lastHeader->getPacketLen() - static_cast<int>(sizeof(int32_t))); |
227 | buffer.resize(size); |
228 | in->readFully(&buffer[0], size, readTimeout); |
229 | lastSeqNo = lastHeader->getSeqno(); |
230 | |
231 | if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) + dataSize + checksumLen) { |
232 | THROW(HdfsIOException, "Invalid Packet, packetLen is %d, dataSize is %d, checksum size is %d" , |
233 | lastHeader->getPacketLen(), dataSize, checksumLen); |
234 | } |
235 | |
236 | if (verify) { |
237 | verifyChecksum(chunks); |
238 | } |
239 | |
240 | /* |
241 | * skip checksum |
242 | */ |
243 | position = checksumLen; |
244 | /* |
245 | * the first packet we get may start at the position before we required |
246 | */ |
247 | pendingAhead = cursor - lastHeader->getOffsetInBlock(); |
248 | pendingAhead = pendingAhead > 0 ? pendingAhead : 0; |
249 | position += pendingAhead; |
250 | } |
251 | |
252 | /* |
253 | * we reach the end of the range we required, send status to datanode |
254 | * if datanode do not sending data anymore. |
255 | */ |
256 | |
257 | if (cursor + dataSize - pendingAhead >= endOffset && readTrailingEmptyPacket()) { |
258 | sendStatus(); |
259 | } |
260 | } |
261 | |
262 | bool RemoteBlockReader::readTrailingEmptyPacket() { |
263 | shared_ptr<PacketHeader> = readPacketHeader(); |
264 | |
265 | if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() != 0) { |
266 | return false; |
267 | } |
268 | |
269 | return true; |
270 | } |
271 | |
272 | void RemoteBlockReader::sendStatus() { |
273 | ClientReadStatusProto status; |
274 | |
275 | if (verify) { |
276 | status.set_status(Status::DT_PROTO_CHECKSUM_OK); |
277 | } else { |
278 | status.set_status(Status::DT_PROTO_SUCCESS); |
279 | } |
280 | |
281 | WriteBuffer buffer; |
282 | int size = status.ByteSize(); |
283 | buffer.writeVarint32(size); |
284 | status.SerializeToArray(buffer.alloc(size), size); |
285 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout); |
286 | sentStatus = true; |
287 | } |
288 | |
289 | void RemoteBlockReader::verifyChecksum(int chunks) { |
290 | int dataSize = lastHeader->getDataLen(); |
291 | char * pchecksum = &buffer[0]; |
292 | char * pdata = &buffer[0] + (chunks * checksumSize); |
293 | |
294 | for (int i = 0; i < chunks; ++i) { |
295 | int size = chunkSize < dataSize ? chunkSize : dataSize; |
296 | dataSize -= size; |
297 | checksum->reset(); |
298 | checksum->update(pdata + (i * chunkSize), size); |
299 | uint32_t result = checksum->getValue(); |
300 | uint32_t target = ReadBigEndian32FromArray(pchecksum + (i * checksumSize)); |
301 | |
302 | if (result != target && size == chunkSize) { |
303 | THROW(ChecksumException, "RemoteBlockReader: checksum not match for Block: %s, on Datanode: %s" , |
304 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
305 | } |
306 | } |
307 | |
308 | assert(0 == dataSize); |
309 | } |
310 | |
311 | int64_t RemoteBlockReader::available() { |
312 | return size - position > 0 ? size - position : 0; |
313 | } |
314 | |
315 | int32_t RemoteBlockReader::read(char * buf, int32_t len) { |
316 | assert(0 != len && NULL != buf); |
317 | |
318 | if (cursor >= endOffset) { |
319 | THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s." , |
320 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
321 | } |
322 | |
323 | try { |
324 | if (position >= size) { |
325 | readNextPacket(); |
326 | } |
327 | |
328 | int32_t todo = len < size - position ? len : size - position; |
329 | memcpy(buf, &buffer[position], todo); |
330 | position += todo; |
331 | cursor += todo; |
332 | return todo; |
333 | } catch (const HdfsTimeoutException & e) { |
334 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
335 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
336 | } catch (const HdfsNetworkException & e) { |
337 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
338 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
339 | } |
340 | } |
341 | |
342 | void RemoteBlockReader::skip(int64_t len) { |
343 | int64_t todo = len; |
344 | assert(cursor + len <= endOffset); |
345 | |
346 | try { |
347 | while (todo > 0) { |
348 | if (cursor >= endOffset) { |
349 | THROW(HdfsIOException, "RemoteBlockReader: skip over block end from Datanode: %s, Block: %s." , |
350 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
351 | } |
352 | |
353 | if (position >= size) { |
354 | readNextPacket(); |
355 | } |
356 | |
357 | int batch = size - position; |
358 | batch = batch < todo ? batch : static_cast<int>(todo); |
359 | position += batch; |
360 | cursor += batch; |
361 | todo -= batch; |
362 | } |
363 | } catch (const HdfsTimeoutException & e) { |
364 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
365 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
366 | } catch (const HdfsNetworkException & e) { |
367 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
368 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
369 | } |
370 | } |
371 | |
372 | } |
373 | } |
374 | |