1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "client/DataTransferProtocolSender.h" |
29 | #include "ReadShortCircuitInfo.h" |
30 | #include "server/Datanode.h" |
31 | #include "datatransfer.pb.h" |
32 | #include "Exception.h" |
33 | #include "ExceptionInternal.h" |
34 | #include "network/DomainSocket.h" |
35 | #include "SWCrc32c.h" |
36 | #include "HWCrc32c.h" |
37 | #include "StringUtil.h" |
38 | |
39 | #include <inttypes.h> |
40 | #include <sstream> |
41 | #include <vector> |
42 | |
43 | namespace Hdfs { |
44 | namespace Internal { |
45 | |
46 | ReadShortCircuitFDCacheType |
47 | ReadShortCircuitInfoBuilder::ReadShortCircuitFDCache; |
48 | BlockLocalPathInfoCacheType |
49 | ReadShortCircuitInfoBuilder::BlockLocalPathInfoCache; |
50 | |
51 | ReadShortCircuitInfo::~ReadShortCircuitInfo() { |
52 | try { |
53 | dataFile.reset(); |
54 | metaFile.reset(); |
55 | ReadShortCircuitInfoBuilder::release(*this); |
56 | } catch (...) { |
57 | } |
58 | } |
59 | |
60 | ReadShortCircuitFDHolder::~ReadShortCircuitFDHolder() { |
61 | if (metafd != -1) { |
62 | ::close(metafd); |
63 | } |
64 | |
65 | if (datafd != -1) { |
66 | ::close(datafd); |
67 | } |
68 | } |
69 | |
70 | ReadShortCircuitInfoBuilder::ReadShortCircuitInfoBuilder( |
71 | const DatanodeInfo& dnInfo, const RpcAuth& auth, const SessionConfig& conf) |
72 | : dnInfo(dnInfo), auth(auth), conf(conf) {} |
73 | |
74 | shared_ptr<ReadShortCircuitInfo> ReadShortCircuitInfoBuilder::fetchOrCreate( |
75 | const ExtendedBlock& block, const Token token) { |
76 | shared_ptr<ReadShortCircuitInfo> retval; |
77 | ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(), |
78 | block.getPoolId()); |
79 | |
80 | if (conf.isLegacyLocalBlockReader()) { |
81 | if (auth.getProtocol() != AuthProtocol::NONE) { |
82 | LOG(WARNING, |
83 | "Legacy read-shortcircuit only works for simple " |
84 | "authentication" ); |
85 | return shared_ptr<ReadShortCircuitInfo>(); |
86 | } |
87 | |
88 | BlockLocalPathInfo info = getBlockLocalPathInfo(block, token); |
89 | assert(block.getBlockId() == info.getBlock().getBlockId() && |
90 | block.getPoolId() == info.getBlock().getPoolId()); |
91 | |
92 | if (0 != access(info.getLocalMetaPath(), R_OK)) { |
93 | invalidBlockLocalPathInfo(block); |
94 | LOG(WARNING, |
95 | "Legacy read-shortcircuit is enabled but path:%s is not " |
96 | "readable." , |
97 | info.getLocalMetaPath()); |
98 | return shared_ptr<ReadShortCircuitInfo>(); |
99 | } |
100 | |
101 | retval = createReadShortCircuitInfo(key, info); |
102 | } else { |
103 | shared_ptr<ReadShortCircuitFDHolder> fds; |
104 | |
105 | // find a pair available file descriptors in cache. |
106 | if (ReadShortCircuitFDCache.findAndErase(key, &fds)) { |
107 | try { |
108 | LOG(DEBUG1, |
109 | "Get file descriptors from cache for block %s, cache size %zu" , |
110 | block.toString().c_str(), ReadShortCircuitFDCache.size()); |
111 | |
112 | return createReadShortCircuitInfo(key, fds); |
113 | } catch (...) { |
114 | // failed to create file wrapper from fds, retry with new fds. |
115 | } |
116 | } |
117 | |
118 | // create a new one |
119 | retval = createReadShortCircuitInfo(key, block, token); |
120 | ReadShortCircuitFDCache.setMaxSize(conf.getMaxFileDescriptorCacheSize()); |
121 | } |
122 | |
123 | return retval; |
124 | } |
125 | |
126 | void ReadShortCircuitInfoBuilder::release(const ReadShortCircuitInfo& info) { |
127 | if (info.isValid() && !info.isLegacy()) { |
128 | ReadShortCircuitFDCache.insert(info.getKey(), info.getFdHolder()); |
129 | LOG(DEBUG1, |
130 | "Inserted file descriptors into cache for block %s, cache size %zu" , |
131 | info.formatBlockInfo().c_str(), ReadShortCircuitFDCache.size()); |
132 | } |
133 | } |
134 | |
135 | BlockLocalPathInfo ReadShortCircuitInfoBuilder::getBlockLocalPathInfo( |
136 | const ExtendedBlock& block, const Token& token) { |
137 | BlockLocalPathInfo retval; |
138 | |
139 | ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(), |
140 | block.getPoolId()); |
141 | |
142 | try { |
143 | if (!BlockLocalPathInfoCache.find(key, &retval)) { |
144 | RpcAuth a = auth; |
145 | SessionConfig c = conf; |
146 | c.setRpcMaxRetryOnConnect(1); |
147 | |
148 | /* |
149 | * only kerberos based authentication is allowed, do not add |
150 | * token |
151 | */ |
152 | shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl( |
153 | dnInfo.getIpAddr().c_str(), dnInfo.getIpcPort(), c, a)); |
154 | dn->getBlockLocalPathInfo(block, token, retval); |
155 | |
156 | BlockLocalPathInfoCache.setMaxSize(conf.getMaxLocalBlockInfoCacheSize()); |
157 | BlockLocalPathInfoCache.insert(key, retval); |
158 | |
159 | LOG(DEBUG1, "Inserted block %s to local block info cache, cache size %zu" , |
160 | block.toString().c_str(), BlockLocalPathInfoCache.size()); |
161 | } else { |
162 | LOG(DEBUG1, |
163 | "Get local block info from cache for block %s, cache size %zu" , |
164 | block.toString().c_str(), BlockLocalPathInfoCache.size()); |
165 | } |
166 | } catch (const HdfsIOException& e) { |
167 | throw; |
168 | } catch (const HdfsException& e) { |
169 | NESTED_THROW(HdfsIOException, |
170 | "ReadShortCircuitInfoBuilder: Failed to get block local " |
171 | "path information." ); |
172 | } |
173 | |
174 | return retval; |
175 | } |
176 | |
177 | void ReadShortCircuitInfoBuilder::invalidBlockLocalPathInfo( |
178 | const ExtendedBlock& block) { |
179 | BlockLocalPathInfoCache.erase(ReadShortCircuitInfoKey( |
180 | dnInfo.getXferPort(), block.getBlockId(), block.getPoolId())); |
181 | } |
182 | |
183 | shared_ptr<ReadShortCircuitInfo> |
184 | ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( |
185 | const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info) { |
186 | shared_ptr<FileWrapper> dataFile; |
187 | shared_ptr<FileWrapper> metaFile; |
188 | |
189 | std::string metaFilePath = info.getLocalMetaPath(); |
190 | std::string dataFilePath = info.getLocalBlockPath(); |
191 | |
192 | if (conf.doUseMappedFile()) { |
193 | metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
194 | dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
195 | } else { |
196 | metaFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
197 | dataFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
198 | } |
199 | |
200 | if (!metaFile->open(metaFilePath)) { |
201 | THROW(HdfsIOException, |
202 | "ReadShortCircuitInfoBuilder cannot open metadata file \"%s\", %s" , |
203 | metaFilePath.c_str(), GetSystemErrorInfo(errno)); |
204 | } |
205 | |
206 | if (!dataFile->open(dataFilePath)) { |
207 | THROW(HdfsIOException, |
208 | "ReadShortCircuitInfoBuilder cannot open data file \"%s\", %s" , |
209 | dataFilePath.c_str(), GetSystemErrorInfo(errno)); |
210 | } |
211 | |
212 | dataFile->seek(0); |
213 | metaFile->seek(0); |
214 | |
215 | shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, true)); |
216 | retval->setDataFile(dataFile); |
217 | retval->setMetaFile(metaFile); |
218 | return retval; |
219 | } |
220 | |
221 | std::string ReadShortCircuitInfoBuilder::buildDomainSocketAddress( |
222 | uint32_t port) { |
223 | std::string domainSocketPath = conf.getDomainSocketPath(); |
224 | |
225 | if (domainSocketPath.empty()) { |
226 | THROW(HdfsIOException, |
227 | "ReadShortCircuitInfoBuilder: \"dfs.domain.socket.path\" is not " |
228 | "set" ); |
229 | } |
230 | |
231 | std::stringstream ss; |
232 | ss.imbue(std::locale::classic()); |
233 | ss << port; |
234 | StringReplaceAll(domainSocketPath, "_PORT" , ss.str()); |
235 | |
236 | return domainSocketPath; |
237 | } |
238 | |
239 | shared_ptr<ReadShortCircuitInfo> |
240 | ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( |
241 | const ReadShortCircuitInfoKey& key, const ExtendedBlock& block, |
242 | const Token& token) { |
243 | std::string addr = buildDomainSocketAddress(key.dnPort); |
244 | DomainSocketImpl sock; |
245 | sock.connect(addr.c_str(), 0, conf.getInputConnTimeout()); |
246 | DataTransferProtocolSender sender(sock, conf.getInputWriteTimeout(), addr); |
247 | sender.requestShortCircuitFds(block, token, MaxReadShortCircuitVersion); |
248 | shared_ptr<ReadShortCircuitFDHolder> fds = |
249 | receiveReadShortCircuitFDs(sock, block); |
250 | return createReadShortCircuitInfo(key, fds); |
251 | } |
252 | |
253 | shared_ptr<ReadShortCircuitFDHolder> |
254 | ReadShortCircuitInfoBuilder::receiveReadShortCircuitFDs( |
255 | Socket& sock, const ExtendedBlock& block) { |
256 | std::vector<char> respBuffer; |
257 | int readTimeout = conf.getInputReadTimeout(); |
258 | shared_ptr<BufferedSocketReader> in( |
259 | new BufferedSocketReaderImpl(sock, 0)); // disable buffer |
260 | int32_t respSize = in->readVarint32(readTimeout); |
261 | |
262 | if (respSize <= 0 || respSize > 10 * 1024 * 1024) { |
263 | THROW(HdfsIOException, |
264 | "ReadShortCircuitInfoBuilder get a invalid response size: %d, " |
265 | "Block: %s, " |
266 | "from Datanode: %s" , |
267 | respSize, block.toString().c_str(), dnInfo.formatAddress().c_str()); |
268 | } |
269 | |
270 | respBuffer.resize(respSize); |
271 | in->readFully(&respBuffer[0], respSize, readTimeout); |
272 | BlockOpResponseProto resp; |
273 | |
274 | if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) { |
275 | THROW(HdfsIOException, |
276 | "ReadShortCircuitInfoBuilder cannot parse BlockOpResponseProto " |
277 | "from " |
278 | "Datanode response, " |
279 | "Block: %s, from Datanode: %s" , |
280 | block.toString().c_str(), dnInfo.formatAddress().c_str()); |
281 | } |
282 | |
283 | if (resp.status() != Status::DT_PROTO_SUCCESS) { |
284 | std::string msg; |
285 | |
286 | if (resp.has_message()) { |
287 | msg = resp.message(); |
288 | } |
289 | |
290 | if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) { |
291 | THROW(HdfsInvalidBlockToken, |
292 | "ReadShortCircuitInfoBuilder: block's token is invalid. " |
293 | "Datanode: %s, Block: %s" , |
294 | dnInfo.formatAddress().c_str(), block.toString().c_str()); |
295 | } else if (resp.status() == Status::DT_PROTO_ERROR_UNSUPPORTED) { |
296 | THROW(HdfsIOException, |
297 | "short-circuit read access is disabled for " |
298 | "DataNode %s. reason: %s" , |
299 | dnInfo.formatAddress().c_str(), |
300 | (msg.empty() ? "check Datanode's log for more information" |
301 | : msg.c_str())); |
302 | } else { |
303 | THROW(HdfsIOException, |
304 | "ReadShortCircuitInfoBuilder: Datanode return an error when " |
305 | "sending read request to Datanode: %s, Block: %s, %s." , |
306 | dnInfo.formatAddress().c_str(), block.toString().c_str(), |
307 | (msg.empty() ? "check Datanode's log for more information" |
308 | : msg.c_str())); |
309 | } |
310 | } |
311 | |
312 | DomainSocketImpl* domainSocket = dynamic_cast<DomainSocketImpl*>(&sock); |
313 | |
314 | if (NULL == domainSocket) { |
315 | THROW(HdfsIOException, "Read short-circuit only works with Domain Socket" ); |
316 | } |
317 | |
318 | shared_ptr<ReadShortCircuitFDHolder> fds(new ReadShortCircuitFDHolder); |
319 | |
320 | std::vector<int> tempFds(2, -1); |
321 | respBuffer.resize(1); |
322 | domainSocket->receiveFileDescriptors(&tempFds[0], tempFds.size(), |
323 | &respBuffer[0], respBuffer.size()); |
324 | |
325 | assert(tempFds[0] != -1 && "failed to receive data file descriptor" ); |
326 | assert(tempFds[1] != -1 && "failed to receive metadata file descriptor" ); |
327 | |
328 | fds->datafd = tempFds[0]; |
329 | fds->metafd = tempFds[1]; |
330 | |
331 | return fds; |
332 | } |
333 | |
334 | shared_ptr<ReadShortCircuitInfo> |
335 | ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( |
336 | const ReadShortCircuitInfoKey& key, |
337 | const shared_ptr<ReadShortCircuitFDHolder>& fds) { |
338 | shared_ptr<FileWrapper> dataFile; |
339 | shared_ptr<FileWrapper> metaFile; |
340 | |
341 | if (conf.doUseMappedFile()) { |
342 | metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
343 | dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
344 | } else { |
345 | metaFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
346 | dataFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
347 | } |
348 | |
349 | metaFile->open(fds->metafd, false); |
350 | dataFile->open(fds->datafd, false); |
351 | |
352 | dataFile->seek(0); |
353 | metaFile->seek(0); |
354 | |
355 | shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, false)); |
356 | |
357 | retval->setFdHolder(fds); |
358 | retval->setDataFile(dataFile); |
359 | retval->setMetaFile(metaFile); |
360 | |
361 | return retval; |
362 | } |
363 | } |
364 | } |
365 | |