1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "BigEndian.h"
29#include "datatransfer.pb.h"
30#include "Exception.h"
31#include "ExceptionInternal.h"
32#include "HWCrc32c.h"
33#include "LocalBlockReader.h"
34#include "SWCrc32c.h"
35
36#include <inttypes.h>
37#include <limits>
38
39#define BMVERSION 1
40#define BMVERSION_SIZE 2
41
42#define HEADER_SIZE (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE)
43
44namespace Hdfs {
45namespace Internal {
46
47LocalBlockReader::LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info,
48 const ExtendedBlock& block, int64_t offset,
49 bool verify, SessionConfig& conf,
50 std::vector<char>& buffer)
51 : verify(verify),
52 pbuffer(NULL),
53 pMetaBuffer(NULL),
54 block(block),
55 checksumSize(0),
56 chunkSize(0),
57 position(0),
58 size(0),
59 cursor(0),
60 length(block.getNumBytes()),
61 info(info),
62 buffer(buffer) {
63 try {
64 metaFd = info->getMetaFile();
65 dataFd = info->getDataFile();
66
67 std::vector<char> header;
68 pMetaBuffer = metaFd->read(header, HEADER_SIZE);
69 int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]);
70
71 if (BMVERSION != version) {
72 THROW(HdfsIOException,
73 "LocalBlockReader get an unmatched block, expected block version %d, real version is %d",
74 BMVERSION, static_cast<int>(version));
75 }
76
77 switch (pMetaBuffer[BMVERSION_SIZE]) {
78 case ChecksumTypeProto::CHECKSUM_NULL:
79 this->verify = false;
80 checksumSize = 0;
81 metaFd.reset();
82 break;
83
84 case ChecksumTypeProto::CHECKSUM_CRC32:
85 THROW(HdfsIOException,
86 "LocalBlockReader does not support CRC32 checksum.");
87 break;
88
89 case ChecksumTypeProto::CHECKSUM_CRC32C:
90 if (HWCrc32c::available()) {
91 checksum = shared_ptr<Checksum>(new HWCrc32c());
92 } else {
93 checksum = shared_ptr<Checksum>(new SWCrc32c());
94 }
95
96 chunkSize = ReadBigEndian32FromArray(
97 &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]);
98 checksumSize = sizeof(int32_t);
99 break;
100
101 default:
102 THROW(HdfsIOException,
103 "LocalBlockReader cannot recognize checksum type: %d.",
104 static_cast<int>(pMetaBuffer[BMVERSION_SIZE]));
105 }
106
107 if (verify && chunkSize <= 0) {
108 THROW(HdfsIOException,
109 "LocalBlockReader get an invalid checksum parameter, bytes per check: %d.",
110 chunkSize);
111 }
112
113 localBufferSize = conf.getLocalReadBufferSize();
114
115 if (verify) {
116 localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize;
117 }
118
119 if (offset > 0) {
120 skip(offset);
121 }
122 } catch (const HdfsCanceled & e) {
123 throw;
124 } catch (const HdfsException & e) {
125 NESTED_THROW(HdfsIOException,
126 "Failed to construct LocalBlockReader for block: %s.",
127 block.toString().c_str());
128 }
129}
130
131LocalBlockReader::~LocalBlockReader() {
132}
133
134void LocalBlockReader::readAndVerify(int32_t bufferSize) {
135 assert(true == verify);
136 assert(cursor % chunkSize == 0);
137 int chunks = (bufferSize + chunkSize - 1) / chunkSize;
138 pbuffer = dataFd->read(buffer, bufferSize);
139 pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize);
140
141 for (int i = 0; i < chunks; ++i) {
142 checksum->reset();
143 int chunk = chunkSize;
144
145 if (chunkSize * (i + 1) > bufferSize) {
146 chunk = bufferSize % chunkSize;
147 }
148
149 checksum->update(&pbuffer[i * chunkSize], chunk);
150 uint32_t target = ReadBigEndian32FromArray(
151 &pMetaBuffer[i * checksumSize]);
152
153 if (target != checksum->getValue()) {
154 THROW(ChecksumException,
155 "LocalBlockReader checksum not match for block: %s",
156 block.toString().c_str());
157 }
158 }
159}
160
161int32_t LocalBlockReader::readInternal(char * buf, int32_t len) {
162 int32_t todo = len;
163
164 /*
165 * read from buffer.
166 */
167 if (position < size) {
168 todo = todo < size - position ? todo : size - position;
169 memcpy(buf, &pbuffer[position], todo);
170 position += todo;
171 cursor += todo;
172 return todo;
173 }
174
175 /*
176 * end of block
177 */
178 todo = todo < length - cursor ? todo : length - cursor;
179
180 if (0 == todo) {
181 return 0;
182 }
183
184 /*
185 * bypass the buffer
186 */
187 if (!verify
188 && (todo > localBufferSize || todo == length - cursor)) {
189 dataFd->copy(buf, todo);
190 cursor += todo;
191 return todo;
192 }
193
194 /*
195 * fill buffer.
196 */
197 int bufferSize = localBufferSize;
198 bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor;
199 assert(bufferSize > 0);
200
201 if (verify) {
202 readAndVerify(bufferSize);
203 } else {
204 pbuffer = dataFd->read(buffer, bufferSize);
205 }
206
207 position = 0;
208 size = bufferSize;
209 assert(position < size);
210 return readInternal(buf, todo);
211}
212
213int32_t LocalBlockReader::read(char * buf, int32_t size) {
214 try {
215 return readInternal(buf, size);
216 } catch (const HdfsCanceled & e) {
217 throw;
218 } catch (const HdfsException & e) {
219 info->setValid(false);
220 NESTED_THROW(HdfsIOException,
221 "LocalBlockReader failed to read from position: %" PRId64 ", length: %d, block: %s.",
222 cursor, size, block.toString().c_str());
223 }
224
225 assert(!"cannot reach here");
226 return 0;
227}
228
229void LocalBlockReader::skip(int64_t len) {
230 assert(len < length - cursor);
231
232 try {
233 int64_t todo = len;
234
235 while (todo > 0) {
236 /*
237 * skip the data in buffer.
238 */
239 if (size - position > 0) {
240 int batch = todo < size - position ? todo : size - position;
241 position += batch;
242 todo -= batch;
243 cursor += batch;
244 continue;
245 }
246
247 if (verify) {
248 int64_t lastChunkSize = (cursor + todo) % chunkSize;
249 cursor = (cursor + todo) / chunkSize * chunkSize;
250 int64_t metaCursor = HEADER_SIZE
251 + checksumSize * (cursor / chunkSize);
252 metaFd->seek(metaCursor);
253 todo = lastChunkSize;
254 } else {
255 cursor += todo;
256 todo = 0;
257 }
258
259 if (cursor > 0) {
260 dataFd->seek(cursor);
261 }
262
263 /*
264 * fill buffer again and verify checksum
265 */
266 if (todo > 0) {
267 assert(true == verify);
268 int bufferSize = localBufferSize;
269 bufferSize =
270 bufferSize < length - cursor ?
271 bufferSize : length - cursor;
272 readAndVerify(bufferSize);
273 position = 0;
274 size = bufferSize;
275 }
276 }
277 } catch (const HdfsCanceled & e) {
278 throw;
279 } catch (const HdfsException & e) {
280 info->setValid(false);
281 NESTED_THROW(HdfsIOException,
282 "LocalBlockReader failed to skip from position: %" PRId64 ", length: %d, block: %s.",
283 cursor, size, block.toString().c_str());
284 }
285}
286
287}
288}
289