| 1 | /******************************************************************** |
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Author: Zhanwei Wang |
| 6 | ********************************************************************/ |
| 7 | /******************************************************************** |
| 8 | * 2014 - |
| 9 | * open source under Apache License Version 2.0 |
| 10 | ********************************************************************/ |
| 11 | /** |
| 12 | * Licensed to the Apache Software Foundation (ASF) under one |
| 13 | * or more contributor license agreements. See the NOTICE file |
| 14 | * distributed with this work for additional information |
| 15 | * regarding copyright ownership. The ASF licenses this file |
| 16 | * to you under the Apache License, Version 2.0 (the |
| 17 | * "License"); you may not use this file except in compliance |
| 18 | * with the License. You may obtain a copy of the License at |
| 19 | * |
| 20 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | * |
| 22 | * Unless required by applicable law or agreed to in writing, software |
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | * See the License for the specific language governing permissions and |
| 26 | * limitations under the License. |
| 27 | */ |
| 28 | #include "BigEndian.h" |
| 29 | #include "DataTransferProtocolSender.h" |
| 30 | #include "Exception.h" |
| 31 | #include "ExceptionInternal.h" |
| 32 | #include "HWCrc32c.h" |
| 33 | #include "RemoteBlockReader.h" |
| 34 | #include "SWCrc32c.h" |
| 35 | #include "WriteBuffer.h" |
| 36 | |
| 37 | #include <inttypes.h> |
| 38 | #include <vector> |
| 39 | |
| 40 | namespace Hdfs { |
| 41 | namespace Internal { |
| 42 | |
| 43 | RemoteBlockReader::RemoteBlockReader(const ExtendedBlock& eb, |
| 44 | DatanodeInfo& datanode, |
| 45 | PeerCache& peerCache, int64_t start, |
| 46 | int64_t len, const Token& token, |
| 47 | const char* clientName, bool verify, |
| 48 | SessionConfig& conf) |
| 49 | : sentStatus(false), |
| 50 | verify(verify), |
| 51 | binfo(eb), |
| 52 | datanode(datanode), |
| 53 | checksumSize(0), |
| 54 | chunkSize(0), |
| 55 | position(0), |
| 56 | size(0), |
| 57 | cursor(start), |
| 58 | endOffset(len + start), |
| 59 | lastSeqNo(-1), |
| 60 | peerCache(peerCache) { |
| 61 | |
| 62 | assert(start >= 0); |
| 63 | readTimeout = conf.getInputReadTimeout(); |
| 64 | writeTimeout = conf.getInputWriteTimeout(); |
| 65 | connTimeout = conf.getInputConnTimeout(); |
| 66 | sock = getNextPeer(datanode); |
| 67 | in = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock)); |
| 68 | sender = shared_ptr<DataTransferProtocol>(new DataTransferProtocolSender( |
| 69 | *sock, writeTimeout, datanode.formatAddress())); |
| 70 | sender->readBlock(eb, token, clientName, start, len); |
| 71 | checkResponse(); |
| 72 | } |
| 73 | |
| 74 | RemoteBlockReader::~RemoteBlockReader() { |
| 75 | if (sentStatus) { |
| 76 | peerCache.addConnection(sock, datanode); |
| 77 | } else { |
| 78 | sock->close(); |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | shared_ptr<Socket> RemoteBlockReader::getNextPeer(const DatanodeInfo& dn) { |
| 83 | shared_ptr<Socket> sock; |
| 84 | try { |
| 85 | sock = peerCache.getConnection(dn); |
| 86 | |
| 87 | if (!sock) { |
| 88 | sock = shared_ptr<Socket>(new TcpSocketImpl); |
| 89 | sock->connect(dn.getIpAddr().c_str(), dn.getXferPort(), |
| 90 | connTimeout); |
| 91 | sock->setNoDelay(true); |
| 92 | } |
| 93 | } catch (const HdfsTimeoutException & e) { |
| 94 | NESTED_THROW(HdfsIOException, |
| 95 | "RemoteBlockReader: Failed to connect to %s" , |
| 96 | dn.formatAddress().c_str()); |
| 97 | } |
| 98 | |
| 99 | return sock; |
| 100 | } |
| 101 | |
| 102 | void RemoteBlockReader::checkResponse() { |
| 103 | std::vector<char> respBuffer; |
| 104 | int32_t respSize = in->readVarint32(readTimeout); |
| 105 | |
| 106 | if (respSize <= 0 || respSize > 10 * 1024 * 1024) { |
| 107 | THROW(HdfsIOException, "RemoteBlockReader get a invalid response size: %d, Block: %s, from Datanode: %s" , |
| 108 | respSize, binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 109 | } |
| 110 | |
| 111 | respBuffer.resize(respSize); |
| 112 | in->readFully(&respBuffer[0], respSize, readTimeout); |
| 113 | BlockOpResponseProto resp; |
| 114 | |
| 115 | if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) { |
| 116 | THROW(HdfsIOException, "RemoteBlockReader cannot parse BlockOpResponseProto from Datanode response, " |
| 117 | "Block: %s, from Datanode: %s" , binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 118 | } |
| 119 | |
| 120 | if (resp.status() != Status::DT_PROTO_SUCCESS) { |
| 121 | std::string msg; |
| 122 | |
| 123 | if (resp.has_message()) { |
| 124 | msg = resp.message(); |
| 125 | } |
| 126 | |
| 127 | if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) { |
| 128 | THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token is invalid. Datanode: %s, Block: %s" , |
| 129 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
| 130 | } else { |
| 131 | THROW(HdfsIOException, |
| 132 | "RemoteBlockReader: Datanode return an error when sending read request to Datanode: %s, Block: %s, %s." , |
| 133 | datanode.formatAddress().c_str(), binfo.toString().c_str(), |
| 134 | (msg.empty() ? "check Datanode's log for more information" : msg.c_str())); |
| 135 | } |
| 136 | } |
| 137 | |
| 138 | const ReadOpChecksumInfoProto & checksumInfo = resp.readopchecksuminfo(); |
| 139 | const ChecksumProto & cs = checksumInfo.checksum(); |
| 140 | chunkSize = cs.bytesperchecksum(); |
| 141 | |
| 142 | if (chunkSize < 0) { |
| 143 | THROW(HdfsIOException, |
| 144 | "RemoteBlockReader invalid chunk size: %d, expected range[0, %" PRId64 "], Block: %s, from Datanode: %s" , |
| 145 | chunkSize, binfo.getNumBytes(), binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 146 | } |
| 147 | |
| 148 | switch (cs.type()) { |
| 149 | case ChecksumTypeProto::CHECKSUM_NULL: |
| 150 | verify = false; |
| 151 | checksumSize = 0; |
| 152 | break; |
| 153 | |
| 154 | case ChecksumTypeProto::CHECKSUM_CRC32: |
| 155 | THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 checksum, Block: %s, from Datanode: %s" , |
| 156 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 157 | break; |
| 158 | |
| 159 | case ChecksumTypeProto::CHECKSUM_CRC32C: |
| 160 | if (HWCrc32c::available()) { |
| 161 | checksum = shared_ptr<Checksum>(new HWCrc32c()); |
| 162 | } else { |
| 163 | checksum = shared_ptr<Checksum>(new SWCrc32c()); |
| 164 | } |
| 165 | |
| 166 | checksumSize = sizeof(int32_t); |
| 167 | break; |
| 168 | |
| 169 | default: |
| 170 | THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum type: %d, Block: %s, from Datanode: %s" , |
| 171 | static_cast<int>(cs.type()), binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 172 | } |
| 173 | |
| 174 | /* |
| 175 | * The offset into the block at which the first packet |
| 176 | * will start. This is necessary since reads will align |
| 177 | * backwards to a checksum chunk boundary. |
| 178 | */ |
| 179 | int64_t firstChunkOffset = checksumInfo.chunkoffset(); |
| 180 | |
| 181 | if (firstChunkOffset < 0 || firstChunkOffset > cursor || firstChunkOffset <= cursor - chunkSize) { |
| 182 | THROW(HdfsIOException, |
| 183 | "RemoteBlockReader invalid first chunk offset: %" PRId64 ", expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s" , |
| 184 | firstChunkOffset, cursor, binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | shared_ptr<PacketHeader> RemoteBlockReader::() { |
| 189 | try { |
| 190 | shared_ptr<PacketHeader> retval; |
| 191 | static const int = PacketHeader::GetPkgHeaderSize(); |
| 192 | std::vector<char> buf(packetHeaderLen); |
| 193 | |
| 194 | if (lastHeader && lastHeader->isLastPacketInBlock()) { |
| 195 | THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s." , |
| 196 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
| 197 | } |
| 198 | |
| 199 | in->readFully(&buf[0], packetHeaderLen, readTimeout); |
| 200 | retval = shared_ptr<PacketHeader>(new PacketHeader); |
| 201 | retval->readFields(&buf[0], packetHeaderLen); |
| 202 | return retval; |
| 203 | } catch (const HdfsIOException & e) { |
| 204 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read block header for Block: %s from Datanode: %s." , |
| 205 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | void RemoteBlockReader::readNextPacket() { |
| 210 | assert(position >= size); |
| 211 | lastHeader = readPacketHeader(); |
| 212 | int dataSize = lastHeader->getDataLen(); |
| 213 | int64_t pendingAhead = 0; |
| 214 | |
| 215 | if (!lastHeader->sanityCheck(lastSeqNo)) { |
| 216 | THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity check for block %s from Datanode %s." , |
| 217 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 218 | } |
| 219 | |
| 220 | assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t)); |
| 221 | |
| 222 | if (dataSize > 0) { |
| 223 | int chunks = (dataSize + chunkSize - 1) / chunkSize; |
| 224 | int checksumLen = chunks * checksumSize; |
| 225 | size = checksumLen + dataSize; |
| 226 | assert(size == lastHeader->getPacketLen() - static_cast<int>(sizeof(int32_t))); |
| 227 | buffer.resize(size); |
| 228 | in->readFully(&buffer[0], size, readTimeout); |
| 229 | lastSeqNo = lastHeader->getSeqno(); |
| 230 | |
| 231 | if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) + dataSize + checksumLen) { |
| 232 | THROW(HdfsIOException, "Invalid Packet, packetLen is %d, dataSize is %d, checksum size is %d" , |
| 233 | lastHeader->getPacketLen(), dataSize, checksumLen); |
| 234 | } |
| 235 | |
| 236 | if (verify) { |
| 237 | verifyChecksum(chunks); |
| 238 | } |
| 239 | |
| 240 | /* |
| 241 | * skip checksum |
| 242 | */ |
| 243 | position = checksumLen; |
| 244 | /* |
| 245 | * the first packet we get may start at the position before we required |
| 246 | */ |
| 247 | pendingAhead = cursor - lastHeader->getOffsetInBlock(); |
| 248 | pendingAhead = pendingAhead > 0 ? pendingAhead : 0; |
| 249 | position += pendingAhead; |
| 250 | } |
| 251 | |
| 252 | /* |
| 253 | * we reach the end of the range we required, send status to datanode |
| 254 | * if datanode do not sending data anymore. |
| 255 | */ |
| 256 | |
| 257 | if (cursor + dataSize - pendingAhead >= endOffset && readTrailingEmptyPacket()) { |
| 258 | sendStatus(); |
| 259 | } |
| 260 | } |
| 261 | |
| 262 | bool RemoteBlockReader::readTrailingEmptyPacket() { |
| 263 | shared_ptr<PacketHeader> = readPacketHeader(); |
| 264 | |
| 265 | if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() != 0) { |
| 266 | return false; |
| 267 | } |
| 268 | |
| 269 | return true; |
| 270 | } |
| 271 | |
| 272 | void RemoteBlockReader::sendStatus() { |
| 273 | ClientReadStatusProto status; |
| 274 | |
| 275 | if (verify) { |
| 276 | status.set_status(Status::DT_PROTO_CHECKSUM_OK); |
| 277 | } else { |
| 278 | status.set_status(Status::DT_PROTO_SUCCESS); |
| 279 | } |
| 280 | |
| 281 | WriteBuffer buffer; |
| 282 | int size = status.ByteSize(); |
| 283 | buffer.writeVarint32(size); |
| 284 | status.SerializeToArray(buffer.alloc(size), size); |
| 285 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout); |
| 286 | sentStatus = true; |
| 287 | } |
| 288 | |
| 289 | void RemoteBlockReader::verifyChecksum(int chunks) { |
| 290 | int dataSize = lastHeader->getDataLen(); |
| 291 | char * pchecksum = &buffer[0]; |
| 292 | char * pdata = &buffer[0] + (chunks * checksumSize); |
| 293 | |
| 294 | for (int i = 0; i < chunks; ++i) { |
| 295 | int size = chunkSize < dataSize ? chunkSize : dataSize; |
| 296 | dataSize -= size; |
| 297 | checksum->reset(); |
| 298 | checksum->update(pdata + (i * chunkSize), size); |
| 299 | uint32_t result = checksum->getValue(); |
| 300 | uint32_t target = ReadBigEndian32FromArray(pchecksum + (i * checksumSize)); |
| 301 | |
| 302 | if (result != target && size == chunkSize) { |
| 303 | THROW(ChecksumException, "RemoteBlockReader: checksum not match for Block: %s, on Datanode: %s" , |
| 304 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 305 | } |
| 306 | } |
| 307 | |
| 308 | assert(0 == dataSize); |
| 309 | } |
| 310 | |
| 311 | int64_t RemoteBlockReader::available() { |
| 312 | return size - position > 0 ? size - position : 0; |
| 313 | } |
| 314 | |
| 315 | int32_t RemoteBlockReader::read(char * buf, int32_t len) { |
| 316 | assert(0 != len && NULL != buf); |
| 317 | |
| 318 | if (cursor >= endOffset) { |
| 319 | THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s." , |
| 320 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
| 321 | } |
| 322 | |
| 323 | try { |
| 324 | if (position >= size) { |
| 325 | readNextPacket(); |
| 326 | } |
| 327 | |
| 328 | int32_t todo = len < size - position ? len : size - position; |
| 329 | memcpy(buf, &buffer[position], todo); |
| 330 | position += todo; |
| 331 | cursor += todo; |
| 332 | return todo; |
| 333 | } catch (const HdfsTimeoutException & e) { |
| 334 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
| 335 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 336 | } catch (const HdfsNetworkException & e) { |
| 337 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
| 338 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 339 | } |
| 340 | } |
| 341 | |
| 342 | void RemoteBlockReader::skip(int64_t len) { |
| 343 | int64_t todo = len; |
| 344 | assert(cursor + len <= endOffset); |
| 345 | |
| 346 | try { |
| 347 | while (todo > 0) { |
| 348 | if (cursor >= endOffset) { |
| 349 | THROW(HdfsIOException, "RemoteBlockReader: skip over block end from Datanode: %s, Block: %s." , |
| 350 | datanode.formatAddress().c_str(), binfo.toString().c_str()); |
| 351 | } |
| 352 | |
| 353 | if (position >= size) { |
| 354 | readNextPacket(); |
| 355 | } |
| 356 | |
| 357 | int batch = size - position; |
| 358 | batch = batch < todo ? batch : static_cast<int>(todo); |
| 359 | position += batch; |
| 360 | cursor += batch; |
| 361 | todo -= batch; |
| 362 | } |
| 363 | } catch (const HdfsTimeoutException & e) { |
| 364 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
| 365 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 366 | } catch (const HdfsNetworkException & e) { |
| 367 | NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s." , |
| 368 | binfo.toString().c_str(), datanode.formatAddress().c_str()); |
| 369 | } |
| 370 | } |
| 371 | |
| 372 | } |
| 373 | } |
| 374 | |