1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "client/DataTransferProtocolSender.h"
29#include "ReadShortCircuitInfo.h"
30#include "server/Datanode.h"
31#include "datatransfer.pb.h"
32#include "Exception.h"
33#include "ExceptionInternal.h"
34#include "network/DomainSocket.h"
35#include "SWCrc32c.h"
36#include "HWCrc32c.h"
37#include "StringUtil.h"
38
39#include <inttypes.h>
40#include <sstream>
41#include <vector>
42
43namespace Hdfs {
44namespace Internal {
45
46ReadShortCircuitFDCacheType
47 ReadShortCircuitInfoBuilder::ReadShortCircuitFDCache;
48BlockLocalPathInfoCacheType
49 ReadShortCircuitInfoBuilder::BlockLocalPathInfoCache;
50
51ReadShortCircuitInfo::~ReadShortCircuitInfo() {
52 try {
53 dataFile.reset();
54 metaFile.reset();
55 ReadShortCircuitInfoBuilder::release(*this);
56 } catch (...) {
57 }
58}
59
60ReadShortCircuitFDHolder::~ReadShortCircuitFDHolder() {
61 if (metafd != -1) {
62 ::close(metafd);
63 }
64
65 if (datafd != -1) {
66 ::close(datafd);
67 }
68}
69
70ReadShortCircuitInfoBuilder::ReadShortCircuitInfoBuilder(
71 const DatanodeInfo& dnInfo, const RpcAuth& auth, const SessionConfig& conf)
72 : dnInfo(dnInfo), auth(auth), conf(conf) {}
73
74shared_ptr<ReadShortCircuitInfo> ReadShortCircuitInfoBuilder::fetchOrCreate(
75 const ExtendedBlock& block, const Token token) {
76 shared_ptr<ReadShortCircuitInfo> retval;
77 ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(),
78 block.getPoolId());
79
80 if (conf.isLegacyLocalBlockReader()) {
81 if (auth.getProtocol() != AuthProtocol::NONE) {
82 LOG(WARNING,
83 "Legacy read-shortcircuit only works for simple "
84 "authentication");
85 return shared_ptr<ReadShortCircuitInfo>();
86 }
87
88 BlockLocalPathInfo info = getBlockLocalPathInfo(block, token);
89 assert(block.getBlockId() == info.getBlock().getBlockId() &&
90 block.getPoolId() == info.getBlock().getPoolId());
91
92 if (0 != access(info.getLocalMetaPath(), R_OK)) {
93 invalidBlockLocalPathInfo(block);
94 LOG(WARNING,
95 "Legacy read-shortcircuit is enabled but path:%s is not "
96 "readable.",
97 info.getLocalMetaPath());
98 return shared_ptr<ReadShortCircuitInfo>();
99 }
100
101 retval = createReadShortCircuitInfo(key, info);
102 } else {
103 shared_ptr<ReadShortCircuitFDHolder> fds;
104
105 // find a pair available file descriptors in cache.
106 if (ReadShortCircuitFDCache.findAndErase(key, &fds)) {
107 try {
108 LOG(DEBUG1,
109 "Get file descriptors from cache for block %s, cache size %zu",
110 block.toString().c_str(), ReadShortCircuitFDCache.size());
111
112 return createReadShortCircuitInfo(key, fds);
113 } catch (...) {
114 // failed to create file wrapper from fds, retry with new fds.
115 }
116 }
117
118 // create a new one
119 retval = createReadShortCircuitInfo(key, block, token);
120 ReadShortCircuitFDCache.setMaxSize(conf.getMaxFileDescriptorCacheSize());
121 }
122
123 return retval;
124}
125
126void ReadShortCircuitInfoBuilder::release(const ReadShortCircuitInfo& info) {
127 if (info.isValid() && !info.isLegacy()) {
128 ReadShortCircuitFDCache.insert(info.getKey(), info.getFdHolder());
129 LOG(DEBUG1,
130 "Inserted file descriptors into cache for block %s, cache size %zu",
131 info.formatBlockInfo().c_str(), ReadShortCircuitFDCache.size());
132 }
133}
134
135BlockLocalPathInfo ReadShortCircuitInfoBuilder::getBlockLocalPathInfo(
136 const ExtendedBlock& block, const Token& token) {
137 BlockLocalPathInfo retval;
138
139 ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(),
140 block.getPoolId());
141
142 try {
143 if (!BlockLocalPathInfoCache.find(key, &retval)) {
144 RpcAuth a = auth;
145 SessionConfig c = conf;
146 c.setRpcMaxRetryOnConnect(1);
147
148 /*
149 * only kerberos based authentication is allowed, do not add
150 * token
151 */
152 shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl(
153 dnInfo.getIpAddr().c_str(), dnInfo.getIpcPort(), c, a));
154 dn->getBlockLocalPathInfo(block, token, retval);
155
156 BlockLocalPathInfoCache.setMaxSize(conf.getMaxLocalBlockInfoCacheSize());
157 BlockLocalPathInfoCache.insert(key, retval);
158
159 LOG(DEBUG1, "Inserted block %s to local block info cache, cache size %zu",
160 block.toString().c_str(), BlockLocalPathInfoCache.size());
161 } else {
162 LOG(DEBUG1,
163 "Get local block info from cache for block %s, cache size %zu",
164 block.toString().c_str(), BlockLocalPathInfoCache.size());
165 }
166 } catch (const HdfsIOException& e) {
167 throw;
168 } catch (const HdfsException& e) {
169 NESTED_THROW(HdfsIOException,
170 "ReadShortCircuitInfoBuilder: Failed to get block local "
171 "path information.");
172 }
173
174 return retval;
175}
176
177void ReadShortCircuitInfoBuilder::invalidBlockLocalPathInfo(
178 const ExtendedBlock& block) {
179 BlockLocalPathInfoCache.erase(ReadShortCircuitInfoKey(
180 dnInfo.getXferPort(), block.getBlockId(), block.getPoolId()));
181}
182
183shared_ptr<ReadShortCircuitInfo>
184ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
185 const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info) {
186 shared_ptr<FileWrapper> dataFile;
187 shared_ptr<FileWrapper> metaFile;
188
189 std::string metaFilePath = info.getLocalMetaPath();
190 std::string dataFilePath = info.getLocalBlockPath();
191
192 if (conf.doUseMappedFile()) {
193 metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
194 dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
195 } else {
196 metaFile = shared_ptr<CFileWrapper>(new CFileWrapper);
197 dataFile = shared_ptr<CFileWrapper>(new CFileWrapper);
198 }
199
200 if (!metaFile->open(metaFilePath)) {
201 THROW(HdfsIOException,
202 "ReadShortCircuitInfoBuilder cannot open metadata file \"%s\", %s",
203 metaFilePath.c_str(), GetSystemErrorInfo(errno));
204 }
205
206 if (!dataFile->open(dataFilePath)) {
207 THROW(HdfsIOException,
208 "ReadShortCircuitInfoBuilder cannot open data file \"%s\", %s",
209 dataFilePath.c_str(), GetSystemErrorInfo(errno));
210 }
211
212 dataFile->seek(0);
213 metaFile->seek(0);
214
215 shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, true));
216 retval->setDataFile(dataFile);
217 retval->setMetaFile(metaFile);
218 return retval;
219}
220
221std::string ReadShortCircuitInfoBuilder::buildDomainSocketAddress(
222 uint32_t port) {
223 std::string domainSocketPath = conf.getDomainSocketPath();
224
225 if (domainSocketPath.empty()) {
226 THROW(HdfsIOException,
227 "ReadShortCircuitInfoBuilder: \"dfs.domain.socket.path\" is not "
228 "set");
229 }
230
231 std::stringstream ss;
232 ss.imbue(std::locale::classic());
233 ss << port;
234 StringReplaceAll(domainSocketPath, "_PORT", ss.str());
235
236 return domainSocketPath;
237}
238
239shared_ptr<ReadShortCircuitInfo>
240ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
241 const ReadShortCircuitInfoKey& key, const ExtendedBlock& block,
242 const Token& token) {
243 std::string addr = buildDomainSocketAddress(key.dnPort);
244 DomainSocketImpl sock;
245 sock.connect(addr.c_str(), 0, conf.getInputConnTimeout());
246 DataTransferProtocolSender sender(sock, conf.getInputWriteTimeout(), addr);
247 sender.requestShortCircuitFds(block, token, MaxReadShortCircuitVersion);
248 shared_ptr<ReadShortCircuitFDHolder> fds =
249 receiveReadShortCircuitFDs(sock, block);
250 return createReadShortCircuitInfo(key, fds);
251}
252
253shared_ptr<ReadShortCircuitFDHolder>
254ReadShortCircuitInfoBuilder::receiveReadShortCircuitFDs(
255 Socket& sock, const ExtendedBlock& block) {
256 std::vector<char> respBuffer;
257 int readTimeout = conf.getInputReadTimeout();
258 shared_ptr<BufferedSocketReader> in(
259 new BufferedSocketReaderImpl(sock, 0)); // disable buffer
260 int32_t respSize = in->readVarint32(readTimeout);
261
262 if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
263 THROW(HdfsIOException,
264 "ReadShortCircuitInfoBuilder get a invalid response size: %d, "
265 "Block: %s, "
266 "from Datanode: %s",
267 respSize, block.toString().c_str(), dnInfo.formatAddress().c_str());
268 }
269
270 respBuffer.resize(respSize);
271 in->readFully(&respBuffer[0], respSize, readTimeout);
272 BlockOpResponseProto resp;
273
274 if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
275 THROW(HdfsIOException,
276 "ReadShortCircuitInfoBuilder cannot parse BlockOpResponseProto "
277 "from "
278 "Datanode response, "
279 "Block: %s, from Datanode: %s",
280 block.toString().c_str(), dnInfo.formatAddress().c_str());
281 }
282
283 if (resp.status() != Status::DT_PROTO_SUCCESS) {
284 std::string msg;
285
286 if (resp.has_message()) {
287 msg = resp.message();
288 }
289
290 if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) {
291 THROW(HdfsInvalidBlockToken,
292 "ReadShortCircuitInfoBuilder: block's token is invalid. "
293 "Datanode: %s, Block: %s",
294 dnInfo.formatAddress().c_str(), block.toString().c_str());
295 } else if (resp.status() == Status::DT_PROTO_ERROR_UNSUPPORTED) {
296 THROW(HdfsIOException,
297 "short-circuit read access is disabled for "
298 "DataNode %s. reason: %s",
299 dnInfo.formatAddress().c_str(),
300 (msg.empty() ? "check Datanode's log for more information"
301 : msg.c_str()));
302 } else {
303 THROW(HdfsIOException,
304 "ReadShortCircuitInfoBuilder: Datanode return an error when "
305 "sending read request to Datanode: %s, Block: %s, %s.",
306 dnInfo.formatAddress().c_str(), block.toString().c_str(),
307 (msg.empty() ? "check Datanode's log for more information"
308 : msg.c_str()));
309 }
310 }
311
312 DomainSocketImpl* domainSocket = dynamic_cast<DomainSocketImpl*>(&sock);
313
314 if (NULL == domainSocket) {
315 THROW(HdfsIOException, "Read short-circuit only works with Domain Socket");
316 }
317
318 shared_ptr<ReadShortCircuitFDHolder> fds(new ReadShortCircuitFDHolder);
319
320 std::vector<int> tempFds(2, -1);
321 respBuffer.resize(1);
322 domainSocket->receiveFileDescriptors(&tempFds[0], tempFds.size(),
323 &respBuffer[0], respBuffer.size());
324
325 assert(tempFds[0] != -1 && "failed to receive data file descriptor");
326 assert(tempFds[1] != -1 && "failed to receive metadata file descriptor");
327
328 fds->datafd = tempFds[0];
329 fds->metafd = tempFds[1];
330
331 return fds;
332}
333
334shared_ptr<ReadShortCircuitInfo>
335ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
336 const ReadShortCircuitInfoKey& key,
337 const shared_ptr<ReadShortCircuitFDHolder>& fds) {
338 shared_ptr<FileWrapper> dataFile;
339 shared_ptr<FileWrapper> metaFile;
340
341 if (conf.doUseMappedFile()) {
342 metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
343 dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
344 } else {
345 metaFile = shared_ptr<CFileWrapper>(new CFileWrapper);
346 dataFile = shared_ptr<CFileWrapper>(new CFileWrapper);
347 }
348
349 metaFile->open(fds->metafd, false);
350 dataFile->open(fds->datafd, false);
351
352 dataFile->seek(0);
353 metaFile->seek(0);
354
355 shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, false));
356
357 retval->setFdHolder(fds);
358 retval->setDataFile(dataFile);
359 retval->setMetaFile(metaFile);
360
361 return retval;
362}
363}
364}
365