| 1 | /******************************************************************** | 
|---|
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. | 
|---|
| 3 | * All rights reserved. | 
|---|
| 4 | * | 
|---|
| 5 | * Author: Zhanwei Wang | 
|---|
| 6 | ********************************************************************/ | 
|---|
| 7 | /******************************************************************** | 
|---|
| 8 | * 2014 - | 
|---|
| 9 | * open source under Apache License Version 2.0 | 
|---|
| 10 | ********************************************************************/ | 
|---|
| 11 | /** | 
|---|
| 12 | * Licensed to the Apache Software Foundation (ASF) under one | 
|---|
| 13 | * or more contributor license agreements.  See the NOTICE file | 
|---|
| 14 | * distributed with this work for additional information | 
|---|
| 15 | * regarding copyright ownership.  The ASF licenses this file | 
|---|
| 16 | * to you under the Apache License, Version 2.0 (the | 
|---|
| 17 | * "License"); you may not use this file except in compliance | 
|---|
| 18 | * with the License.  You may obtain a copy of the License at | 
|---|
| 19 | * | 
|---|
| 20 | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|---|
| 21 | * | 
|---|
| 22 | * Unless required by applicable law or agreed to in writing, software | 
|---|
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, | 
|---|
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|---|
| 25 | * See the License for the specific language governing permissions and | 
|---|
| 26 | * limitations under the License. | 
|---|
| 27 | */ | 
|---|
| 28 | #include "BigEndian.h" | 
|---|
| 29 | #include "datatransfer.pb.h" | 
|---|
| 30 | #include "Exception.h" | 
|---|
| 31 | #include "ExceptionInternal.h" | 
|---|
| 32 | #include "HWCrc32c.h" | 
|---|
| 33 | #include "LocalBlockReader.h" | 
|---|
| 34 | #include "SWCrc32c.h" | 
|---|
| 35 |  | 
|---|
| 36 | #include <inttypes.h> | 
|---|
| 37 | #include <limits> | 
|---|
| 38 |  | 
|---|
| 39 | #define BMVERSION 1 | 
|---|
| 40 | #define BMVERSION_SIZE 2 | 
|---|
| 41 |  | 
|---|
| 42 | #define  (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE) | 
|---|
| 43 |  | 
|---|
| 44 | namespace Hdfs { | 
|---|
| 45 | namespace Internal { | 
|---|
| 46 |  | 
|---|
| 47 | LocalBlockReader::LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info, | 
|---|
| 48 | const ExtendedBlock& block, int64_t offset, | 
|---|
| 49 | bool verify, SessionConfig& conf, | 
|---|
| 50 | std::vector<char>& buffer) | 
|---|
| 51 | : verify(verify), | 
|---|
| 52 | pbuffer(NULL), | 
|---|
| 53 | pMetaBuffer(NULL), | 
|---|
| 54 | block(block), | 
|---|
| 55 | checksumSize(0), | 
|---|
| 56 | chunkSize(0), | 
|---|
| 57 | position(0), | 
|---|
| 58 | size(0), | 
|---|
| 59 | cursor(0), | 
|---|
| 60 | length(block.getNumBytes()), | 
|---|
| 61 | info(info), | 
|---|
| 62 | buffer(buffer) { | 
|---|
| 63 | try { | 
|---|
| 64 | metaFd = info->getMetaFile(); | 
|---|
| 65 | dataFd = info->getDataFile(); | 
|---|
| 66 |  | 
|---|
| 67 | std::vector<char> ; | 
|---|
| 68 | pMetaBuffer = metaFd->read(header, HEADER_SIZE); | 
|---|
| 69 | int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]); | 
|---|
| 70 |  | 
|---|
| 71 | if (BMVERSION != version) { | 
|---|
| 72 | THROW(HdfsIOException, | 
|---|
| 73 | "LocalBlockReader get an unmatched block, expected block version %d, real version is %d", | 
|---|
| 74 | BMVERSION, static_cast<int>(version)); | 
|---|
| 75 | } | 
|---|
| 76 |  | 
|---|
| 77 | switch (pMetaBuffer[BMVERSION_SIZE]) { | 
|---|
| 78 | case ChecksumTypeProto::CHECKSUM_NULL: | 
|---|
| 79 | this->verify = false; | 
|---|
| 80 | checksumSize = 0; | 
|---|
| 81 | metaFd.reset(); | 
|---|
| 82 | break; | 
|---|
| 83 |  | 
|---|
| 84 | case ChecksumTypeProto::CHECKSUM_CRC32: | 
|---|
| 85 | THROW(HdfsIOException, | 
|---|
| 86 | "LocalBlockReader does not support CRC32 checksum."); | 
|---|
| 87 | break; | 
|---|
| 88 |  | 
|---|
| 89 | case ChecksumTypeProto::CHECKSUM_CRC32C: | 
|---|
| 90 | if (HWCrc32c::available()) { | 
|---|
| 91 | checksum = shared_ptr<Checksum>(new HWCrc32c()); | 
|---|
| 92 | } else { | 
|---|
| 93 | checksum = shared_ptr<Checksum>(new SWCrc32c()); | 
|---|
| 94 | } | 
|---|
| 95 |  | 
|---|
| 96 | chunkSize = ReadBigEndian32FromArray( | 
|---|
| 97 | &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]); | 
|---|
| 98 | checksumSize = sizeof(int32_t); | 
|---|
| 99 | break; | 
|---|
| 100 |  | 
|---|
| 101 | default: | 
|---|
| 102 | THROW(HdfsIOException, | 
|---|
| 103 | "LocalBlockReader cannot recognize checksum type: %d.", | 
|---|
| 104 | static_cast<int>(pMetaBuffer[BMVERSION_SIZE])); | 
|---|
| 105 | } | 
|---|
| 106 |  | 
|---|
| 107 | if (verify && chunkSize <= 0) { | 
|---|
| 108 | THROW(HdfsIOException, | 
|---|
| 109 | "LocalBlockReader get an invalid checksum parameter, bytes per check: %d.", | 
|---|
| 110 | chunkSize); | 
|---|
| 111 | } | 
|---|
| 112 |  | 
|---|
| 113 | localBufferSize = conf.getLocalReadBufferSize(); | 
|---|
| 114 |  | 
|---|
| 115 | if (verify) { | 
|---|
| 116 | localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize; | 
|---|
| 117 | } | 
|---|
| 118 |  | 
|---|
| 119 | if (offset > 0) { | 
|---|
| 120 | skip(offset); | 
|---|
| 121 | } | 
|---|
| 122 | } catch (const HdfsCanceled & e) { | 
|---|
| 123 | throw; | 
|---|
| 124 | } catch (const HdfsException & e) { | 
|---|
| 125 | NESTED_THROW(HdfsIOException, | 
|---|
| 126 | "Failed to construct LocalBlockReader for block: %s.", | 
|---|
| 127 | block.toString().c_str()); | 
|---|
| 128 | } | 
|---|
| 129 | } | 
|---|
| 130 |  | 
|---|
| 131 | LocalBlockReader::~LocalBlockReader() { | 
|---|
| 132 | } | 
|---|
| 133 |  | 
|---|
| 134 | void LocalBlockReader::readAndVerify(int32_t bufferSize) { | 
|---|
| 135 | assert(true == verify); | 
|---|
| 136 | assert(cursor % chunkSize == 0); | 
|---|
| 137 | int chunks = (bufferSize + chunkSize - 1) / chunkSize; | 
|---|
| 138 | pbuffer = dataFd->read(buffer, bufferSize); | 
|---|
| 139 | pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize); | 
|---|
| 140 |  | 
|---|
| 141 | for (int i = 0; i < chunks; ++i) { | 
|---|
| 142 | checksum->reset(); | 
|---|
| 143 | int chunk = chunkSize; | 
|---|
| 144 |  | 
|---|
| 145 | if (chunkSize * (i + 1) > bufferSize) { | 
|---|
| 146 | chunk = bufferSize % chunkSize; | 
|---|
| 147 | } | 
|---|
| 148 |  | 
|---|
| 149 | checksum->update(&pbuffer[i * chunkSize], chunk); | 
|---|
| 150 | uint32_t target = ReadBigEndian32FromArray( | 
|---|
| 151 | &pMetaBuffer[i * checksumSize]); | 
|---|
| 152 |  | 
|---|
| 153 | if (target != checksum->getValue()) { | 
|---|
| 154 | THROW(ChecksumException, | 
|---|
| 155 | "LocalBlockReader checksum not match for block: %s", | 
|---|
| 156 | block.toString().c_str()); | 
|---|
| 157 | } | 
|---|
| 158 | } | 
|---|
| 159 | } | 
|---|
| 160 |  | 
|---|
| 161 | int32_t LocalBlockReader::readInternal(char * buf, int32_t len) { | 
|---|
| 162 | int32_t todo = len; | 
|---|
| 163 |  | 
|---|
| 164 | /* | 
|---|
| 165 | * read from buffer. | 
|---|
| 166 | */ | 
|---|
| 167 | if (position < size) { | 
|---|
| 168 | todo = todo < size - position ? todo : size - position; | 
|---|
| 169 | memcpy(buf, &pbuffer[position], todo); | 
|---|
| 170 | position += todo; | 
|---|
| 171 | cursor += todo; | 
|---|
| 172 | return todo; | 
|---|
| 173 | } | 
|---|
| 174 |  | 
|---|
| 175 | /* | 
|---|
| 176 | * end of block | 
|---|
| 177 | */ | 
|---|
| 178 | todo = todo < length - cursor ? todo : length - cursor; | 
|---|
| 179 |  | 
|---|
| 180 | if (0 == todo) { | 
|---|
| 181 | return 0; | 
|---|
| 182 | } | 
|---|
| 183 |  | 
|---|
| 184 | /* | 
|---|
| 185 | * bypass the buffer | 
|---|
| 186 | */ | 
|---|
| 187 | if (!verify | 
|---|
| 188 | && (todo > localBufferSize || todo == length - cursor)) { | 
|---|
| 189 | dataFd->copy(buf, todo); | 
|---|
| 190 | cursor += todo; | 
|---|
| 191 | return todo; | 
|---|
| 192 | } | 
|---|
| 193 |  | 
|---|
| 194 | /* | 
|---|
| 195 | * fill buffer. | 
|---|
| 196 | */ | 
|---|
| 197 | int bufferSize = localBufferSize; | 
|---|
| 198 | bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor; | 
|---|
| 199 | assert(bufferSize > 0); | 
|---|
| 200 |  | 
|---|
| 201 | if (verify) { | 
|---|
| 202 | readAndVerify(bufferSize); | 
|---|
| 203 | } else { | 
|---|
| 204 | pbuffer = dataFd->read(buffer, bufferSize); | 
|---|
| 205 | } | 
|---|
| 206 |  | 
|---|
| 207 | position = 0; | 
|---|
| 208 | size = bufferSize; | 
|---|
| 209 | assert(position < size); | 
|---|
| 210 | return readInternal(buf, todo); | 
|---|
| 211 | } | 
|---|
| 212 |  | 
|---|
| 213 | int32_t LocalBlockReader::read(char * buf, int32_t size) { | 
|---|
| 214 | try { | 
|---|
| 215 | return readInternal(buf, size); | 
|---|
| 216 | } catch (const HdfsCanceled & e) { | 
|---|
| 217 | throw; | 
|---|
| 218 | } catch (const HdfsException & e) { | 
|---|
| 219 | info->setValid(false); | 
|---|
| 220 | NESTED_THROW(HdfsIOException, | 
|---|
| 221 | "LocalBlockReader failed to read from position: %"PRId64 ", length: %d, block: %s.", | 
|---|
| 222 | cursor, size, block.toString().c_str()); | 
|---|
| 223 | } | 
|---|
| 224 |  | 
|---|
| 225 | assert(! "cannot reach here"); | 
|---|
| 226 | return 0; | 
|---|
| 227 | } | 
|---|
| 228 |  | 
|---|
| 229 | void LocalBlockReader::skip(int64_t len) { | 
|---|
| 230 | assert(len < length - cursor); | 
|---|
| 231 |  | 
|---|
| 232 | try { | 
|---|
| 233 | int64_t todo = len; | 
|---|
| 234 |  | 
|---|
| 235 | while (todo > 0) { | 
|---|
| 236 | /* | 
|---|
| 237 | * skip the data in buffer. | 
|---|
| 238 | */ | 
|---|
| 239 | if (size - position > 0) { | 
|---|
| 240 | int batch = todo < size - position ? todo : size - position; | 
|---|
| 241 | position += batch; | 
|---|
| 242 | todo -= batch; | 
|---|
| 243 | cursor += batch; | 
|---|
| 244 | continue; | 
|---|
| 245 | } | 
|---|
| 246 |  | 
|---|
| 247 | if (verify) { | 
|---|
| 248 | int64_t lastChunkSize = (cursor + todo) % chunkSize; | 
|---|
| 249 | cursor = (cursor + todo) / chunkSize * chunkSize; | 
|---|
| 250 | int64_t metaCursor = HEADER_SIZE | 
|---|
| 251 | + checksumSize * (cursor / chunkSize); | 
|---|
| 252 | metaFd->seek(metaCursor); | 
|---|
| 253 | todo = lastChunkSize; | 
|---|
| 254 | } else { | 
|---|
| 255 | cursor += todo; | 
|---|
| 256 | todo = 0; | 
|---|
| 257 | } | 
|---|
| 258 |  | 
|---|
| 259 | if (cursor > 0) { | 
|---|
| 260 | dataFd->seek(cursor); | 
|---|
| 261 | } | 
|---|
| 262 |  | 
|---|
| 263 | /* | 
|---|
| 264 | * fill buffer again and verify checksum | 
|---|
| 265 | */ | 
|---|
| 266 | if (todo > 0) { | 
|---|
| 267 | assert(true == verify); | 
|---|
| 268 | int bufferSize = localBufferSize; | 
|---|
| 269 | bufferSize = | 
|---|
| 270 | bufferSize < length - cursor ? | 
|---|
| 271 | bufferSize : length - cursor; | 
|---|
| 272 | readAndVerify(bufferSize); | 
|---|
| 273 | position = 0; | 
|---|
| 274 | size = bufferSize; | 
|---|
| 275 | } | 
|---|
| 276 | } | 
|---|
| 277 | } catch (const HdfsCanceled & e) { | 
|---|
| 278 | throw; | 
|---|
| 279 | } catch (const HdfsException & e) { | 
|---|
| 280 | info->setValid(false); | 
|---|
| 281 | NESTED_THROW(HdfsIOException, | 
|---|
| 282 | "LocalBlockReader failed to skip from position: %"PRId64 ", length: %d, block: %s.", | 
|---|
| 283 | cursor, size, block.toString().c_str()); | 
|---|
| 284 | } | 
|---|
| 285 | } | 
|---|
| 286 |  | 
|---|
| 287 | } | 
|---|
| 288 | } | 
|---|
| 289 |  | 
|---|