| 1 | /******************************************************************** |
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Author: Zhanwei Wang |
| 6 | ********************************************************************/ |
| 7 | /******************************************************************** |
| 8 | * 2014 - |
| 9 | * open source under Apache License Version 2.0 |
| 10 | ********************************************************************/ |
| 11 | /** |
| 12 | * Licensed to the Apache Software Foundation (ASF) under one |
| 13 | * or more contributor license agreements. See the NOTICE file |
| 14 | * distributed with this work for additional information |
| 15 | * regarding copyright ownership. The ASF licenses this file |
| 16 | * to you under the Apache License, Version 2.0 (the |
| 17 | * "License"); you may not use this file except in compliance |
| 18 | * with the License. You may obtain a copy of the License at |
| 19 | * |
| 20 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | * |
| 22 | * Unless required by applicable law or agreed to in writing, software |
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | * See the License for the specific language governing permissions and |
| 26 | * limitations under the License. |
| 27 | */ |
| 28 | #include "BigEndian.h" |
| 29 | #include "datatransfer.pb.h" |
| 30 | #include "Exception.h" |
| 31 | #include "ExceptionInternal.h" |
| 32 | #include "HWCrc32c.h" |
| 33 | #include "LocalBlockReader.h" |
| 34 | #include "SWCrc32c.h" |
| 35 | |
| 36 | #include <inttypes.h> |
| 37 | #include <limits> |
| 38 | |
| 39 | #define BMVERSION 1 |
| 40 | #define BMVERSION_SIZE 2 |
| 41 | |
| 42 | #define (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE) |
| 43 | |
| 44 | namespace Hdfs { |
| 45 | namespace Internal { |
| 46 | |
| 47 | LocalBlockReader::LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info, |
| 48 | const ExtendedBlock& block, int64_t offset, |
| 49 | bool verify, SessionConfig& conf, |
| 50 | std::vector<char>& buffer) |
| 51 | : verify(verify), |
| 52 | pbuffer(NULL), |
| 53 | pMetaBuffer(NULL), |
| 54 | block(block), |
| 55 | checksumSize(0), |
| 56 | chunkSize(0), |
| 57 | position(0), |
| 58 | size(0), |
| 59 | cursor(0), |
| 60 | length(block.getNumBytes()), |
| 61 | info(info), |
| 62 | buffer(buffer) { |
| 63 | try { |
| 64 | metaFd = info->getMetaFile(); |
| 65 | dataFd = info->getDataFile(); |
| 66 | |
| 67 | std::vector<char> ; |
| 68 | pMetaBuffer = metaFd->read(header, HEADER_SIZE); |
| 69 | int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]); |
| 70 | |
| 71 | if (BMVERSION != version) { |
| 72 | THROW(HdfsIOException, |
| 73 | "LocalBlockReader get an unmatched block, expected block version %d, real version is %d" , |
| 74 | BMVERSION, static_cast<int>(version)); |
| 75 | } |
| 76 | |
| 77 | switch (pMetaBuffer[BMVERSION_SIZE]) { |
| 78 | case ChecksumTypeProto::CHECKSUM_NULL: |
| 79 | this->verify = false; |
| 80 | checksumSize = 0; |
| 81 | metaFd.reset(); |
| 82 | break; |
| 83 | |
| 84 | case ChecksumTypeProto::CHECKSUM_CRC32: |
| 85 | THROW(HdfsIOException, |
| 86 | "LocalBlockReader does not support CRC32 checksum." ); |
| 87 | break; |
| 88 | |
| 89 | case ChecksumTypeProto::CHECKSUM_CRC32C: |
| 90 | if (HWCrc32c::available()) { |
| 91 | checksum = shared_ptr<Checksum>(new HWCrc32c()); |
| 92 | } else { |
| 93 | checksum = shared_ptr<Checksum>(new SWCrc32c()); |
| 94 | } |
| 95 | |
| 96 | chunkSize = ReadBigEndian32FromArray( |
| 97 | &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]); |
| 98 | checksumSize = sizeof(int32_t); |
| 99 | break; |
| 100 | |
| 101 | default: |
| 102 | THROW(HdfsIOException, |
| 103 | "LocalBlockReader cannot recognize checksum type: %d." , |
| 104 | static_cast<int>(pMetaBuffer[BMVERSION_SIZE])); |
| 105 | } |
| 106 | |
| 107 | if (verify && chunkSize <= 0) { |
| 108 | THROW(HdfsIOException, |
| 109 | "LocalBlockReader get an invalid checksum parameter, bytes per check: %d." , |
| 110 | chunkSize); |
| 111 | } |
| 112 | |
| 113 | localBufferSize = conf.getLocalReadBufferSize(); |
| 114 | |
| 115 | if (verify) { |
| 116 | localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize; |
| 117 | } |
| 118 | |
| 119 | if (offset > 0) { |
| 120 | skip(offset); |
| 121 | } |
| 122 | } catch (const HdfsCanceled & e) { |
| 123 | throw; |
| 124 | } catch (const HdfsException & e) { |
| 125 | NESTED_THROW(HdfsIOException, |
| 126 | "Failed to construct LocalBlockReader for block: %s." , |
| 127 | block.toString().c_str()); |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | LocalBlockReader::~LocalBlockReader() { |
| 132 | } |
| 133 | |
| 134 | void LocalBlockReader::readAndVerify(int32_t bufferSize) { |
| 135 | assert(true == verify); |
| 136 | assert(cursor % chunkSize == 0); |
| 137 | int chunks = (bufferSize + chunkSize - 1) / chunkSize; |
| 138 | pbuffer = dataFd->read(buffer, bufferSize); |
| 139 | pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize); |
| 140 | |
| 141 | for (int i = 0; i < chunks; ++i) { |
| 142 | checksum->reset(); |
| 143 | int chunk = chunkSize; |
| 144 | |
| 145 | if (chunkSize * (i + 1) > bufferSize) { |
| 146 | chunk = bufferSize % chunkSize; |
| 147 | } |
| 148 | |
| 149 | checksum->update(&pbuffer[i * chunkSize], chunk); |
| 150 | uint32_t target = ReadBigEndian32FromArray( |
| 151 | &pMetaBuffer[i * checksumSize]); |
| 152 | |
| 153 | if (target != checksum->getValue()) { |
| 154 | THROW(ChecksumException, |
| 155 | "LocalBlockReader checksum not match for block: %s" , |
| 156 | block.toString().c_str()); |
| 157 | } |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | int32_t LocalBlockReader::readInternal(char * buf, int32_t len) { |
| 162 | int32_t todo = len; |
| 163 | |
| 164 | /* |
| 165 | * read from buffer. |
| 166 | */ |
| 167 | if (position < size) { |
| 168 | todo = todo < size - position ? todo : size - position; |
| 169 | memcpy(buf, &pbuffer[position], todo); |
| 170 | position += todo; |
| 171 | cursor += todo; |
| 172 | return todo; |
| 173 | } |
| 174 | |
| 175 | /* |
| 176 | * end of block |
| 177 | */ |
| 178 | todo = todo < length - cursor ? todo : length - cursor; |
| 179 | |
| 180 | if (0 == todo) { |
| 181 | return 0; |
| 182 | } |
| 183 | |
| 184 | /* |
| 185 | * bypass the buffer |
| 186 | */ |
| 187 | if (!verify |
| 188 | && (todo > localBufferSize || todo == length - cursor)) { |
| 189 | dataFd->copy(buf, todo); |
| 190 | cursor += todo; |
| 191 | return todo; |
| 192 | } |
| 193 | |
| 194 | /* |
| 195 | * fill buffer. |
| 196 | */ |
| 197 | int bufferSize = localBufferSize; |
| 198 | bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor; |
| 199 | assert(bufferSize > 0); |
| 200 | |
| 201 | if (verify) { |
| 202 | readAndVerify(bufferSize); |
| 203 | } else { |
| 204 | pbuffer = dataFd->read(buffer, bufferSize); |
| 205 | } |
| 206 | |
| 207 | position = 0; |
| 208 | size = bufferSize; |
| 209 | assert(position < size); |
| 210 | return readInternal(buf, todo); |
| 211 | } |
| 212 | |
| 213 | int32_t LocalBlockReader::read(char * buf, int32_t size) { |
| 214 | try { |
| 215 | return readInternal(buf, size); |
| 216 | } catch (const HdfsCanceled & e) { |
| 217 | throw; |
| 218 | } catch (const HdfsException & e) { |
| 219 | info->setValid(false); |
| 220 | NESTED_THROW(HdfsIOException, |
| 221 | "LocalBlockReader failed to read from position: %" PRId64 ", length: %d, block: %s." , |
| 222 | cursor, size, block.toString().c_str()); |
| 223 | } |
| 224 | |
| 225 | assert(!"cannot reach here" ); |
| 226 | return 0; |
| 227 | } |
| 228 | |
| 229 | void LocalBlockReader::skip(int64_t len) { |
| 230 | assert(len < length - cursor); |
| 231 | |
| 232 | try { |
| 233 | int64_t todo = len; |
| 234 | |
| 235 | while (todo > 0) { |
| 236 | /* |
| 237 | * skip the data in buffer. |
| 238 | */ |
| 239 | if (size - position > 0) { |
| 240 | int batch = todo < size - position ? todo : size - position; |
| 241 | position += batch; |
| 242 | todo -= batch; |
| 243 | cursor += batch; |
| 244 | continue; |
| 245 | } |
| 246 | |
| 247 | if (verify) { |
| 248 | int64_t lastChunkSize = (cursor + todo) % chunkSize; |
| 249 | cursor = (cursor + todo) / chunkSize * chunkSize; |
| 250 | int64_t metaCursor = HEADER_SIZE |
| 251 | + checksumSize * (cursor / chunkSize); |
| 252 | metaFd->seek(metaCursor); |
| 253 | todo = lastChunkSize; |
| 254 | } else { |
| 255 | cursor += todo; |
| 256 | todo = 0; |
| 257 | } |
| 258 | |
| 259 | if (cursor > 0) { |
| 260 | dataFd->seek(cursor); |
| 261 | } |
| 262 | |
| 263 | /* |
| 264 | * fill buffer again and verify checksum |
| 265 | */ |
| 266 | if (todo > 0) { |
| 267 | assert(true == verify); |
| 268 | int bufferSize = localBufferSize; |
| 269 | bufferSize = |
| 270 | bufferSize < length - cursor ? |
| 271 | bufferSize : length - cursor; |
| 272 | readAndVerify(bufferSize); |
| 273 | position = 0; |
| 274 | size = bufferSize; |
| 275 | } |
| 276 | } |
| 277 | } catch (const HdfsCanceled & e) { |
| 278 | throw; |
| 279 | } catch (const HdfsException & e) { |
| 280 | info->setValid(false); |
| 281 | NESTED_THROW(HdfsIOException, |
| 282 | "LocalBlockReader failed to skip from position: %" PRId64 ", length: %d, block: %s." , |
| 283 | cursor, size, block.toString().c_str()); |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | } |
| 288 | } |
| 289 | |