1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "BigEndian.h" |
29 | #include "datatransfer.pb.h" |
30 | #include "Exception.h" |
31 | #include "ExceptionInternal.h" |
32 | #include "HWCrc32c.h" |
33 | #include "LocalBlockReader.h" |
34 | #include "SWCrc32c.h" |
35 | |
36 | #include <inttypes.h> |
37 | #include <limits> |
38 | |
39 | #define BMVERSION 1 |
40 | #define BMVERSION_SIZE 2 |
41 | |
42 | #define (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE) |
43 | |
44 | namespace Hdfs { |
45 | namespace Internal { |
46 | |
47 | LocalBlockReader::LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info, |
48 | const ExtendedBlock& block, int64_t offset, |
49 | bool verify, SessionConfig& conf, |
50 | std::vector<char>& buffer) |
51 | : verify(verify), |
52 | pbuffer(NULL), |
53 | pMetaBuffer(NULL), |
54 | block(block), |
55 | checksumSize(0), |
56 | chunkSize(0), |
57 | position(0), |
58 | size(0), |
59 | cursor(0), |
60 | length(block.getNumBytes()), |
61 | info(info), |
62 | buffer(buffer) { |
63 | try { |
64 | metaFd = info->getMetaFile(); |
65 | dataFd = info->getDataFile(); |
66 | |
67 | std::vector<char> ; |
68 | pMetaBuffer = metaFd->read(header, HEADER_SIZE); |
69 | int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]); |
70 | |
71 | if (BMVERSION != version) { |
72 | THROW(HdfsIOException, |
73 | "LocalBlockReader get an unmatched block, expected block version %d, real version is %d" , |
74 | BMVERSION, static_cast<int>(version)); |
75 | } |
76 | |
77 | switch (pMetaBuffer[BMVERSION_SIZE]) { |
78 | case ChecksumTypeProto::CHECKSUM_NULL: |
79 | this->verify = false; |
80 | checksumSize = 0; |
81 | metaFd.reset(); |
82 | break; |
83 | |
84 | case ChecksumTypeProto::CHECKSUM_CRC32: |
85 | THROW(HdfsIOException, |
86 | "LocalBlockReader does not support CRC32 checksum." ); |
87 | break; |
88 | |
89 | case ChecksumTypeProto::CHECKSUM_CRC32C: |
90 | if (HWCrc32c::available()) { |
91 | checksum = shared_ptr<Checksum>(new HWCrc32c()); |
92 | } else { |
93 | checksum = shared_ptr<Checksum>(new SWCrc32c()); |
94 | } |
95 | |
96 | chunkSize = ReadBigEndian32FromArray( |
97 | &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]); |
98 | checksumSize = sizeof(int32_t); |
99 | break; |
100 | |
101 | default: |
102 | THROW(HdfsIOException, |
103 | "LocalBlockReader cannot recognize checksum type: %d." , |
104 | static_cast<int>(pMetaBuffer[BMVERSION_SIZE])); |
105 | } |
106 | |
107 | if (verify && chunkSize <= 0) { |
108 | THROW(HdfsIOException, |
109 | "LocalBlockReader get an invalid checksum parameter, bytes per check: %d." , |
110 | chunkSize); |
111 | } |
112 | |
113 | localBufferSize = conf.getLocalReadBufferSize(); |
114 | |
115 | if (verify) { |
116 | localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize; |
117 | } |
118 | |
119 | if (offset > 0) { |
120 | skip(offset); |
121 | } |
122 | } catch (const HdfsCanceled & e) { |
123 | throw; |
124 | } catch (const HdfsException & e) { |
125 | NESTED_THROW(HdfsIOException, |
126 | "Failed to construct LocalBlockReader for block: %s." , |
127 | block.toString().c_str()); |
128 | } |
129 | } |
130 | |
131 | LocalBlockReader::~LocalBlockReader() { |
132 | } |
133 | |
134 | void LocalBlockReader::readAndVerify(int32_t bufferSize) { |
135 | assert(true == verify); |
136 | assert(cursor % chunkSize == 0); |
137 | int chunks = (bufferSize + chunkSize - 1) / chunkSize; |
138 | pbuffer = dataFd->read(buffer, bufferSize); |
139 | pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize); |
140 | |
141 | for (int i = 0; i < chunks; ++i) { |
142 | checksum->reset(); |
143 | int chunk = chunkSize; |
144 | |
145 | if (chunkSize * (i + 1) > bufferSize) { |
146 | chunk = bufferSize % chunkSize; |
147 | } |
148 | |
149 | checksum->update(&pbuffer[i * chunkSize], chunk); |
150 | uint32_t target = ReadBigEndian32FromArray( |
151 | &pMetaBuffer[i * checksumSize]); |
152 | |
153 | if (target != checksum->getValue()) { |
154 | THROW(ChecksumException, |
155 | "LocalBlockReader checksum not match for block: %s" , |
156 | block.toString().c_str()); |
157 | } |
158 | } |
159 | } |
160 | |
161 | int32_t LocalBlockReader::readInternal(char * buf, int32_t len) { |
162 | int32_t todo = len; |
163 | |
164 | /* |
165 | * read from buffer. |
166 | */ |
167 | if (position < size) { |
168 | todo = todo < size - position ? todo : size - position; |
169 | memcpy(buf, &pbuffer[position], todo); |
170 | position += todo; |
171 | cursor += todo; |
172 | return todo; |
173 | } |
174 | |
175 | /* |
176 | * end of block |
177 | */ |
178 | todo = todo < length - cursor ? todo : length - cursor; |
179 | |
180 | if (0 == todo) { |
181 | return 0; |
182 | } |
183 | |
184 | /* |
185 | * bypass the buffer |
186 | */ |
187 | if (!verify |
188 | && (todo > localBufferSize || todo == length - cursor)) { |
189 | dataFd->copy(buf, todo); |
190 | cursor += todo; |
191 | return todo; |
192 | } |
193 | |
194 | /* |
195 | * fill buffer. |
196 | */ |
197 | int bufferSize = localBufferSize; |
198 | bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor; |
199 | assert(bufferSize > 0); |
200 | |
201 | if (verify) { |
202 | readAndVerify(bufferSize); |
203 | } else { |
204 | pbuffer = dataFd->read(buffer, bufferSize); |
205 | } |
206 | |
207 | position = 0; |
208 | size = bufferSize; |
209 | assert(position < size); |
210 | return readInternal(buf, todo); |
211 | } |
212 | |
213 | int32_t LocalBlockReader::read(char * buf, int32_t size) { |
214 | try { |
215 | return readInternal(buf, size); |
216 | } catch (const HdfsCanceled & e) { |
217 | throw; |
218 | } catch (const HdfsException & e) { |
219 | info->setValid(false); |
220 | NESTED_THROW(HdfsIOException, |
221 | "LocalBlockReader failed to read from position: %" PRId64 ", length: %d, block: %s." , |
222 | cursor, size, block.toString().c_str()); |
223 | } |
224 | |
225 | assert(!"cannot reach here" ); |
226 | return 0; |
227 | } |
228 | |
229 | void LocalBlockReader::skip(int64_t len) { |
230 | assert(len < length - cursor); |
231 | |
232 | try { |
233 | int64_t todo = len; |
234 | |
235 | while (todo > 0) { |
236 | /* |
237 | * skip the data in buffer. |
238 | */ |
239 | if (size - position > 0) { |
240 | int batch = todo < size - position ? todo : size - position; |
241 | position += batch; |
242 | todo -= batch; |
243 | cursor += batch; |
244 | continue; |
245 | } |
246 | |
247 | if (verify) { |
248 | int64_t lastChunkSize = (cursor + todo) % chunkSize; |
249 | cursor = (cursor + todo) / chunkSize * chunkSize; |
250 | int64_t metaCursor = HEADER_SIZE |
251 | + checksumSize * (cursor / chunkSize); |
252 | metaFd->seek(metaCursor); |
253 | todo = lastChunkSize; |
254 | } else { |
255 | cursor += todo; |
256 | todo = 0; |
257 | } |
258 | |
259 | if (cursor > 0) { |
260 | dataFd->seek(cursor); |
261 | } |
262 | |
263 | /* |
264 | * fill buffer again and verify checksum |
265 | */ |
266 | if (todo > 0) { |
267 | assert(true == verify); |
268 | int bufferSize = localBufferSize; |
269 | bufferSize = |
270 | bufferSize < length - cursor ? |
271 | bufferSize : length - cursor; |
272 | readAndVerify(bufferSize); |
273 | position = 0; |
274 | size = bufferSize; |
275 | } |
276 | } |
277 | } catch (const HdfsCanceled & e) { |
278 | throw; |
279 | } catch (const HdfsException & e) { |
280 | info->setValid(false); |
281 | NESTED_THROW(HdfsIOException, |
282 | "LocalBlockReader failed to skip from position: %" PRId64 ", length: %d, block: %s." , |
283 | cursor, size, block.toString().c_str()); |
284 | } |
285 | } |
286 | |
287 | } |
288 | } |
289 | |