| 1 | /******************************************************************** |
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Author: Zhanwei Wang |
| 6 | ********************************************************************/ |
| 7 | /******************************************************************** |
| 8 | * 2014 - |
| 9 | * open source under Apache License Version 2.0 |
| 10 | ********************************************************************/ |
| 11 | /** |
| 12 | * Licensed to the Apache Software Foundation (ASF) under one |
| 13 | * or more contributor license agreements. See the NOTICE file |
| 14 | * distributed with this work for additional information |
| 15 | * regarding copyright ownership. The ASF licenses this file |
| 16 | * to you under the Apache License, Version 2.0 (the |
| 17 | * "License"); you may not use this file except in compliance |
| 18 | * with the License. You may obtain a copy of the License at |
| 19 | * |
| 20 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | * |
| 22 | * Unless required by applicable law or agreed to in writing, software |
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | * See the License for the specific language governing permissions and |
| 26 | * limitations under the License. |
| 27 | */ |
| 28 | #include "client/DataTransferProtocolSender.h" |
| 29 | #include "ReadShortCircuitInfo.h" |
| 30 | #include "server/Datanode.h" |
| 31 | #include "datatransfer.pb.h" |
| 32 | #include "Exception.h" |
| 33 | #include "ExceptionInternal.h" |
| 34 | #include "network/DomainSocket.h" |
| 35 | #include "SWCrc32c.h" |
| 36 | #include "HWCrc32c.h" |
| 37 | #include "StringUtil.h" |
| 38 | |
| 39 | #include <inttypes.h> |
| 40 | #include <sstream> |
| 41 | #include <vector> |
| 42 | |
| 43 | namespace Hdfs { |
| 44 | namespace Internal { |
| 45 | |
| 46 | ReadShortCircuitFDCacheType |
| 47 | ReadShortCircuitInfoBuilder::ReadShortCircuitFDCache; |
| 48 | BlockLocalPathInfoCacheType |
| 49 | ReadShortCircuitInfoBuilder::BlockLocalPathInfoCache; |
| 50 | |
| 51 | ReadShortCircuitInfo::~ReadShortCircuitInfo() { |
| 52 | try { |
| 53 | dataFile.reset(); |
| 54 | metaFile.reset(); |
| 55 | ReadShortCircuitInfoBuilder::release(*this); |
| 56 | } catch (...) { |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | ReadShortCircuitFDHolder::~ReadShortCircuitFDHolder() { |
| 61 | if (metafd != -1) { |
| 62 | ::close(metafd); |
| 63 | } |
| 64 | |
| 65 | if (datafd != -1) { |
| 66 | ::close(datafd); |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | ReadShortCircuitInfoBuilder::ReadShortCircuitInfoBuilder( |
| 71 | const DatanodeInfo& dnInfo, const RpcAuth& auth, const SessionConfig& conf) |
| 72 | : dnInfo(dnInfo), auth(auth), conf(conf) {} |
| 73 | |
| 74 | shared_ptr<ReadShortCircuitInfo> ReadShortCircuitInfoBuilder::fetchOrCreate( |
| 75 | const ExtendedBlock& block, const Token token) { |
| 76 | shared_ptr<ReadShortCircuitInfo> retval; |
| 77 | ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(), |
| 78 | block.getPoolId()); |
| 79 | |
| 80 | if (conf.isLegacyLocalBlockReader()) { |
| 81 | if (auth.getProtocol() != AuthProtocol::NONE) { |
| 82 | LOG(WARNING, |
| 83 | "Legacy read-shortcircuit only works for simple " |
| 84 | "authentication" ); |
| 85 | return shared_ptr<ReadShortCircuitInfo>(); |
| 86 | } |
| 87 | |
| 88 | BlockLocalPathInfo info = getBlockLocalPathInfo(block, token); |
| 89 | assert(block.getBlockId() == info.getBlock().getBlockId() && |
| 90 | block.getPoolId() == info.getBlock().getPoolId()); |
| 91 | |
| 92 | if (0 != access(info.getLocalMetaPath(), R_OK)) { |
| 93 | invalidBlockLocalPathInfo(block); |
| 94 | LOG(WARNING, |
| 95 | "Legacy read-shortcircuit is enabled but path:%s is not " |
| 96 | "readable." , |
| 97 | info.getLocalMetaPath()); |
| 98 | return shared_ptr<ReadShortCircuitInfo>(); |
| 99 | } |
| 100 | |
| 101 | retval = createReadShortCircuitInfo(key, info); |
| 102 | } else { |
| 103 | shared_ptr<ReadShortCircuitFDHolder> fds; |
| 104 | |
| 105 | // find a pair available file descriptors in cache. |
| 106 | if (ReadShortCircuitFDCache.findAndErase(key, &fds)) { |
| 107 | try { |
| 108 | LOG(DEBUG1, |
| 109 | "Get file descriptors from cache for block %s, cache size %zu" , |
| 110 | block.toString().c_str(), ReadShortCircuitFDCache.size()); |
| 111 | |
| 112 | return createReadShortCircuitInfo(key, fds); |
| 113 | } catch (...) { |
| 114 | // failed to create file wrapper from fds, retry with new fds. |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | // create a new one |
| 119 | retval = createReadShortCircuitInfo(key, block, token); |
| 120 | ReadShortCircuitFDCache.setMaxSize(conf.getMaxFileDescriptorCacheSize()); |
| 121 | } |
| 122 | |
| 123 | return retval; |
| 124 | } |
| 125 | |
| 126 | void ReadShortCircuitInfoBuilder::release(const ReadShortCircuitInfo& info) { |
| 127 | if (info.isValid() && !info.isLegacy()) { |
| 128 | ReadShortCircuitFDCache.insert(info.getKey(), info.getFdHolder()); |
| 129 | LOG(DEBUG1, |
| 130 | "Inserted file descriptors into cache for block %s, cache size %zu" , |
| 131 | info.formatBlockInfo().c_str(), ReadShortCircuitFDCache.size()); |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | BlockLocalPathInfo ReadShortCircuitInfoBuilder::getBlockLocalPathInfo( |
| 136 | const ExtendedBlock& block, const Token& token) { |
| 137 | BlockLocalPathInfo retval; |
| 138 | |
| 139 | ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(), |
| 140 | block.getPoolId()); |
| 141 | |
| 142 | try { |
| 143 | if (!BlockLocalPathInfoCache.find(key, &retval)) { |
| 144 | RpcAuth a = auth; |
| 145 | SessionConfig c = conf; |
| 146 | c.setRpcMaxRetryOnConnect(1); |
| 147 | |
| 148 | /* |
| 149 | * only kerberos based authentication is allowed, do not add |
| 150 | * token |
| 151 | */ |
| 152 | shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl( |
| 153 | dnInfo.getIpAddr().c_str(), dnInfo.getIpcPort(), c, a)); |
| 154 | dn->getBlockLocalPathInfo(block, token, retval); |
| 155 | |
| 156 | BlockLocalPathInfoCache.setMaxSize(conf.getMaxLocalBlockInfoCacheSize()); |
| 157 | BlockLocalPathInfoCache.insert(key, retval); |
| 158 | |
| 159 | LOG(DEBUG1, "Inserted block %s to local block info cache, cache size %zu" , |
| 160 | block.toString().c_str(), BlockLocalPathInfoCache.size()); |
| 161 | } else { |
| 162 | LOG(DEBUG1, |
| 163 | "Get local block info from cache for block %s, cache size %zu" , |
| 164 | block.toString().c_str(), BlockLocalPathInfoCache.size()); |
| 165 | } |
| 166 | } catch (const HdfsIOException& e) { |
| 167 | throw; |
| 168 | } catch (const HdfsException& e) { |
| 169 | NESTED_THROW(HdfsIOException, |
| 170 | "ReadShortCircuitInfoBuilder: Failed to get block local " |
| 171 | "path information." ); |
| 172 | } |
| 173 | |
| 174 | return retval; |
| 175 | } |
| 176 | |
| 177 | void ReadShortCircuitInfoBuilder::invalidBlockLocalPathInfo( |
| 178 | const ExtendedBlock& block) { |
| 179 | BlockLocalPathInfoCache.erase(ReadShortCircuitInfoKey( |
| 180 | dnInfo.getXferPort(), block.getBlockId(), block.getPoolId())); |
| 181 | } |
| 182 | |
| 183 | shared_ptr<ReadShortCircuitInfo> |
| 184 | ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( |
| 185 | const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info) { |
| 186 | shared_ptr<FileWrapper> dataFile; |
| 187 | shared_ptr<FileWrapper> metaFile; |
| 188 | |
| 189 | std::string metaFilePath = info.getLocalMetaPath(); |
| 190 | std::string dataFilePath = info.getLocalBlockPath(); |
| 191 | |
| 192 | if (conf.doUseMappedFile()) { |
| 193 | metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
| 194 | dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
| 195 | } else { |
| 196 | metaFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
| 197 | dataFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
| 198 | } |
| 199 | |
| 200 | if (!metaFile->open(metaFilePath)) { |
| 201 | THROW(HdfsIOException, |
| 202 | "ReadShortCircuitInfoBuilder cannot open metadata file \"%s\", %s" , |
| 203 | metaFilePath.c_str(), GetSystemErrorInfo(errno)); |
| 204 | } |
| 205 | |
| 206 | if (!dataFile->open(dataFilePath)) { |
| 207 | THROW(HdfsIOException, |
| 208 | "ReadShortCircuitInfoBuilder cannot open data file \"%s\", %s" , |
| 209 | dataFilePath.c_str(), GetSystemErrorInfo(errno)); |
| 210 | } |
| 211 | |
| 212 | dataFile->seek(0); |
| 213 | metaFile->seek(0); |
| 214 | |
| 215 | shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, true)); |
| 216 | retval->setDataFile(dataFile); |
| 217 | retval->setMetaFile(metaFile); |
| 218 | return retval; |
| 219 | } |
| 220 | |
| 221 | std::string ReadShortCircuitInfoBuilder::buildDomainSocketAddress( |
| 222 | uint32_t port) { |
| 223 | std::string domainSocketPath = conf.getDomainSocketPath(); |
| 224 | |
| 225 | if (domainSocketPath.empty()) { |
| 226 | THROW(HdfsIOException, |
| 227 | "ReadShortCircuitInfoBuilder: \"dfs.domain.socket.path\" is not " |
| 228 | "set" ); |
| 229 | } |
| 230 | |
| 231 | std::stringstream ss; |
| 232 | ss.imbue(std::locale::classic()); |
| 233 | ss << port; |
| 234 | StringReplaceAll(domainSocketPath, "_PORT" , ss.str()); |
| 235 | |
| 236 | return domainSocketPath; |
| 237 | } |
| 238 | |
| 239 | shared_ptr<ReadShortCircuitInfo> |
| 240 | ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( |
| 241 | const ReadShortCircuitInfoKey& key, const ExtendedBlock& block, |
| 242 | const Token& token) { |
| 243 | std::string addr = buildDomainSocketAddress(key.dnPort); |
| 244 | DomainSocketImpl sock; |
| 245 | sock.connect(addr.c_str(), 0, conf.getInputConnTimeout()); |
| 246 | DataTransferProtocolSender sender(sock, conf.getInputWriteTimeout(), addr); |
| 247 | sender.requestShortCircuitFds(block, token, MaxReadShortCircuitVersion); |
| 248 | shared_ptr<ReadShortCircuitFDHolder> fds = |
| 249 | receiveReadShortCircuitFDs(sock, block); |
| 250 | return createReadShortCircuitInfo(key, fds); |
| 251 | } |
| 252 | |
| 253 | shared_ptr<ReadShortCircuitFDHolder> |
| 254 | ReadShortCircuitInfoBuilder::receiveReadShortCircuitFDs( |
| 255 | Socket& sock, const ExtendedBlock& block) { |
| 256 | std::vector<char> respBuffer; |
| 257 | int readTimeout = conf.getInputReadTimeout(); |
| 258 | shared_ptr<BufferedSocketReader> in( |
| 259 | new BufferedSocketReaderImpl(sock, 0)); // disable buffer |
| 260 | int32_t respSize = in->readVarint32(readTimeout); |
| 261 | |
| 262 | if (respSize <= 0 || respSize > 10 * 1024 * 1024) { |
| 263 | THROW(HdfsIOException, |
| 264 | "ReadShortCircuitInfoBuilder get a invalid response size: %d, " |
| 265 | "Block: %s, " |
| 266 | "from Datanode: %s" , |
| 267 | respSize, block.toString().c_str(), dnInfo.formatAddress().c_str()); |
| 268 | } |
| 269 | |
| 270 | respBuffer.resize(respSize); |
| 271 | in->readFully(&respBuffer[0], respSize, readTimeout); |
| 272 | BlockOpResponseProto resp; |
| 273 | |
| 274 | if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) { |
| 275 | THROW(HdfsIOException, |
| 276 | "ReadShortCircuitInfoBuilder cannot parse BlockOpResponseProto " |
| 277 | "from " |
| 278 | "Datanode response, " |
| 279 | "Block: %s, from Datanode: %s" , |
| 280 | block.toString().c_str(), dnInfo.formatAddress().c_str()); |
| 281 | } |
| 282 | |
| 283 | if (resp.status() != Status::DT_PROTO_SUCCESS) { |
| 284 | std::string msg; |
| 285 | |
| 286 | if (resp.has_message()) { |
| 287 | msg = resp.message(); |
| 288 | } |
| 289 | |
| 290 | if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) { |
| 291 | THROW(HdfsInvalidBlockToken, |
| 292 | "ReadShortCircuitInfoBuilder: block's token is invalid. " |
| 293 | "Datanode: %s, Block: %s" , |
| 294 | dnInfo.formatAddress().c_str(), block.toString().c_str()); |
| 295 | } else if (resp.status() == Status::DT_PROTO_ERROR_UNSUPPORTED) { |
| 296 | THROW(HdfsIOException, |
| 297 | "short-circuit read access is disabled for " |
| 298 | "DataNode %s. reason: %s" , |
| 299 | dnInfo.formatAddress().c_str(), |
| 300 | (msg.empty() ? "check Datanode's log for more information" |
| 301 | : msg.c_str())); |
| 302 | } else { |
| 303 | THROW(HdfsIOException, |
| 304 | "ReadShortCircuitInfoBuilder: Datanode return an error when " |
| 305 | "sending read request to Datanode: %s, Block: %s, %s." , |
| 306 | dnInfo.formatAddress().c_str(), block.toString().c_str(), |
| 307 | (msg.empty() ? "check Datanode's log for more information" |
| 308 | : msg.c_str())); |
| 309 | } |
| 310 | } |
| 311 | |
| 312 | DomainSocketImpl* domainSocket = dynamic_cast<DomainSocketImpl*>(&sock); |
| 313 | |
| 314 | if (NULL == domainSocket) { |
| 315 | THROW(HdfsIOException, "Read short-circuit only works with Domain Socket" ); |
| 316 | } |
| 317 | |
| 318 | shared_ptr<ReadShortCircuitFDHolder> fds(new ReadShortCircuitFDHolder); |
| 319 | |
| 320 | std::vector<int> tempFds(2, -1); |
| 321 | respBuffer.resize(1); |
| 322 | domainSocket->receiveFileDescriptors(&tempFds[0], tempFds.size(), |
| 323 | &respBuffer[0], respBuffer.size()); |
| 324 | |
| 325 | assert(tempFds[0] != -1 && "failed to receive data file descriptor" ); |
| 326 | assert(tempFds[1] != -1 && "failed to receive metadata file descriptor" ); |
| 327 | |
| 328 | fds->datafd = tempFds[0]; |
| 329 | fds->metafd = tempFds[1]; |
| 330 | |
| 331 | return fds; |
| 332 | } |
| 333 | |
| 334 | shared_ptr<ReadShortCircuitInfo> |
| 335 | ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( |
| 336 | const ReadShortCircuitInfoKey& key, |
| 337 | const shared_ptr<ReadShortCircuitFDHolder>& fds) { |
| 338 | shared_ptr<FileWrapper> dataFile; |
| 339 | shared_ptr<FileWrapper> metaFile; |
| 340 | |
| 341 | if (conf.doUseMappedFile()) { |
| 342 | metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
| 343 | dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); |
| 344 | } else { |
| 345 | metaFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
| 346 | dataFile = shared_ptr<CFileWrapper>(new CFileWrapper); |
| 347 | } |
| 348 | |
| 349 | metaFile->open(fds->metafd, false); |
| 350 | dataFile->open(fds->datafd, false); |
| 351 | |
| 352 | dataFile->seek(0); |
| 353 | metaFile->seek(0); |
| 354 | |
| 355 | shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, false)); |
| 356 | |
| 357 | retval->setFdHolder(fds); |
| 358 | retval->setDataFile(dataFile); |
| 359 | retval->setMetaFile(metaFile); |
| 360 | |
| 361 | return retval; |
| 362 | } |
| 363 | } |
| 364 | } |
| 365 | |