| 1 | /******************************************************************** |
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Author: Zhanwei Wang |
| 6 | ********************************************************************/ |
| 7 | /******************************************************************** |
| 8 | * 2014 - |
| 9 | * open source under Apache License Version 2.0 |
| 10 | ********************************************************************/ |
| 11 | /** |
| 12 | * Licensed to the Apache Software Foundation (ASF) under one |
| 13 | * or more contributor license agreements. See the NOTICE file |
| 14 | * distributed with this work for additional information |
| 15 | * regarding copyright ownership. The ASF licenses this file |
| 16 | * to you under the Apache License, Version 2.0 (the |
| 17 | * "License"); you may not use this file except in compliance |
| 18 | * with the License. You may obtain a copy of the License at |
| 19 | * |
| 20 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | * |
| 22 | * Unless required by applicable law or agreed to in writing, software |
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | * See the License for the specific language governing permissions and |
| 26 | * limitations under the License. |
| 27 | */ |
| 28 | #include "Exception.h" |
| 29 | #include "ExceptionInternal.h" |
| 30 | #include "IpcConnectionContext.pb.h" |
| 31 | #include "Logger.h" |
| 32 | #include "RpcChannel.h" |
| 33 | #include "RpcClient.h" |
| 34 | #include "RpcContentWrapper.h" |
| 35 | #include "RpcHeader.pb.h" |
| 36 | #include "RpcHeader.pb.h" |
| 37 | #include "server/RpcHelper.h" |
| 38 | #include "Thread.h" |
| 39 | #include "WriteBuffer.h" |
| 40 | |
| 41 | #include <google/protobuf/io/coded_stream.h> |
| 42 | |
| 43 | #define "hrpc" |
| 44 | #define 9 |
| 45 | #define SERIALIZATION_TYPE_PROTOBUF 0 |
| 46 | #define CONNECTION_CONTEXT_CALL_ID -3 |
| 47 | |
| 48 | using namespace ::google::protobuf; |
| 49 | using namespace google::protobuf::io; |
| 50 | |
| 51 | namespace Hdfs { |
| 52 | namespace Internal { |
| 53 | |
| 54 | RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, RpcClient & c) : |
| 55 | refs(0), available(false), key(k), client(c) { |
| 56 | sock = shared_ptr<Socket>(new TcpSocketImpl); |
| 57 | sock->setLingerTimeout(k.getConf().getLingerTimeout()); |
| 58 | in = shared_ptr<BufferedSocketReader>( |
| 59 | new BufferedSocketReaderImpl( |
| 60 | *static_cast<TcpSocketImpl *>(sock.get()))); |
| 61 | lastActivity = lastIdle = steady_clock::now(); |
| 62 | } |
| 63 | |
| 64 | RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, Socket * s, |
| 65 | BufferedSocketReader * in, RpcClient & c) : |
| 66 | refs(0), available(false), key(k), client(c) { |
| 67 | sock = shared_ptr<Socket>(s); |
| 68 | this->in = shared_ptr<BufferedSocketReader>(in); |
| 69 | lastActivity = lastIdle = steady_clock::now(); |
| 70 | } |
| 71 | |
| 72 | RpcChannelImpl::~RpcChannelImpl() { |
| 73 | assert(pendingCalls.empty()); |
| 74 | assert(refs == 0); |
| 75 | |
| 76 | if (available) { |
| 77 | sock->close(); |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | void RpcChannelImpl::close(bool immediate) { |
| 82 | lock_guard<mutex> lock(writeMut); |
| 83 | --refs; |
| 84 | assert(refs >= 0); |
| 85 | |
| 86 | if (immediate && !refs) { |
| 87 | assert(pendingCalls.empty()); |
| 88 | available = false; |
| 89 | sock->close(); |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | void RpcChannelImpl::sendSaslMessage(RpcSaslProto * msg, Message * resp) { |
| 94 | int totalLen; |
| 95 | WriteBuffer buffer; |
| 96 | RpcRequestHeaderProto ; |
| 97 | rpcHeader.set_callid(AuthProtocol::SASL); |
| 98 | rpcHeader.set_clientid(client.getClientId()); |
| 99 | rpcHeader.set_retrycount(INVALID_RETRY_COUNT); |
| 100 | rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER); |
| 101 | rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET); |
| 102 | RpcContentWrapper wrapper(&rpcHeader, msg); |
| 103 | totalLen = wrapper.getLength(); |
| 104 | buffer.writeBigEndian(totalLen); |
| 105 | wrapper.writeTo(buffer); |
| 106 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), key.getConf().getWriteTimeout()); |
| 107 | RpcRemoteCallPtr call( |
| 108 | new RpcRemoteCall(RpcCall(false, "sasl message" , NULL, resp), |
| 109 | AuthProtocol::SASL, client.getClientId())); |
| 110 | pendingCalls[AuthProtocol::SASL] = call; |
| 111 | } |
| 112 | |
| 113 | |
| 114 | const RpcSaslProto_SaslAuth * RpcChannelImpl::createSaslClient( |
| 115 | const RepeatedPtrField<RpcSaslProto_SaslAuth> * auths) { |
| 116 | const RpcSaslProto_SaslAuth * auth = NULL; |
| 117 | Token token; |
| 118 | |
| 119 | for (int i = 0; i < auths->size(); ++i) { |
| 120 | auth = &auths->Get(i); |
| 121 | RpcAuth method(RpcAuth::ParseMethod(auth->method())); |
| 122 | |
| 123 | if (method.getMethod() == AuthMethod::TOKEN && key.hasToken()) { |
| 124 | token = key.getToken(); |
| 125 | break; |
| 126 | } else if (method.getMethod() == AuthMethod::KERBEROS) { |
| 127 | break; |
| 128 | } else if (method.getMethod() == AuthMethod::SIMPLE) { |
| 129 | return auth; |
| 130 | } else if (method.getMethod() == AuthMethod::UNKNOWN) { |
| 131 | return auth; |
| 132 | } else { |
| 133 | auth = NULL; |
| 134 | } |
| 135 | } |
| 136 | |
| 137 | if (!auth) { |
| 138 | std::stringstream ss; |
| 139 | ss.imbue(std::locale::classic()); |
| 140 | ss << "Client cannot authenticate via: " ; |
| 141 | |
| 142 | for (int i = 0; i < auths->size(); ++i) { |
| 143 | auth = &auths->Get(i); |
| 144 | ss << auth->mechanism() << ", " ; |
| 145 | } |
| 146 | |
| 147 | THROW(AccessControlException, "%s" , ss.str().c_str()); |
| 148 | } |
| 149 | |
| 150 | saslClient = shared_ptr<SaslClient>( |
| 151 | new SaslClient(*auth, token, key.getAuth().getUser().getPrincipal())); |
| 152 | return auth; |
| 153 | } |
| 154 | |
| 155 | std::string RpcChannelImpl::saslEvaluateToken(RpcSaslProto & response, bool serverIsDone) { |
| 156 | std::string token; |
| 157 | |
| 158 | if (response.has_token()) { |
| 159 | token = saslClient->evaluateChallenge(response.token()); |
| 160 | } else if (!serverIsDone) { |
| 161 | THROW(AccessControlException, "Server challenge contains no token" ); |
| 162 | } |
| 163 | |
| 164 | if (serverIsDone) { |
| 165 | if (!saslClient->isComplete()) { |
| 166 | THROW(AccessControlException, "Client is out of sync with server" ); |
| 167 | } |
| 168 | |
| 169 | if (!token.empty()) { |
| 170 | THROW(AccessControlException, "Client generated spurious response" ); |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | return token; |
| 175 | } |
| 176 | |
| 177 | RpcAuth RpcChannelImpl::setupSaslConnection() { |
| 178 | RpcAuth retval; |
| 179 | RpcSaslProto negotiateRequest, response, msg; |
| 180 | negotiateRequest.set_state(RpcSaslProto_SaslState_NEGOTIATE); |
| 181 | sendSaslMessage(&negotiateRequest, &response); |
| 182 | bool done = false; |
| 183 | |
| 184 | do { |
| 185 | readOneResponse(false); |
| 186 | msg.Clear(); |
| 187 | |
| 188 | switch (response.state()) { |
| 189 | case RpcSaslProto_SaslState_NEGOTIATE: { |
| 190 | const RpcSaslProto_SaslAuth * auth = createSaslClient( |
| 191 | &response.auths()); |
| 192 | retval = RpcAuth(RpcAuth::ParseMethod(auth->method())); |
| 193 | |
| 194 | if (retval.getMethod() == AuthMethod::SIMPLE) { |
| 195 | done = true; |
| 196 | } else if (retval.getMethod() == AuthMethod::UNKNOWN) { |
| 197 | THROW(AccessControlException, "Unknown auth mechanism" ); |
| 198 | } else { |
| 199 | std::string respToken; |
| 200 | RpcSaslProto_SaslAuth * respAuth = msg.add_auths(); |
| 201 | respAuth->CopyFrom(*auth); |
| 202 | std::string chanllege; |
| 203 | |
| 204 | if (auth->has_challenge()) { |
| 205 | chanllege = auth->challenge(); |
| 206 | respAuth->clear_challenge(); |
| 207 | } |
| 208 | |
| 209 | respToken = saslClient->evaluateChallenge(chanllege); |
| 210 | |
| 211 | if (!respToken.empty()) { |
| 212 | msg.set_token(respToken); |
| 213 | } |
| 214 | |
| 215 | msg.set_state(RpcSaslProto_SaslState_INITIATE); |
| 216 | } |
| 217 | |
| 218 | break; |
| 219 | } |
| 220 | |
| 221 | case RpcSaslProto_SaslState_CHALLENGE: { |
| 222 | if (!saslClient) { |
| 223 | THROW(AccessControlException, "Server sent unsolicited challenge" ); |
| 224 | } |
| 225 | |
| 226 | std::string token = saslEvaluateToken(response, false); |
| 227 | msg.set_token(token); |
| 228 | msg.set_state(RpcSaslProto_SaslState_RESPONSE); |
| 229 | break; |
| 230 | } |
| 231 | |
| 232 | case RpcSaslProto_SaslState_SUCCESS: |
| 233 | if (!saslClient) { |
| 234 | retval = RpcAuth(AuthMethod::SIMPLE); |
| 235 | } else { |
| 236 | saslEvaluateToken(response, true); |
| 237 | } |
| 238 | |
| 239 | done = true; |
| 240 | break; |
| 241 | |
| 242 | default: |
| 243 | break; |
| 244 | } |
| 245 | |
| 246 | if (!done) { |
| 247 | response.Clear(); |
| 248 | sendSaslMessage(&msg, &response); |
| 249 | } |
| 250 | } while (!done); |
| 251 | |
| 252 | return retval; |
| 253 | } |
| 254 | |
| 255 | void RpcChannelImpl::connect() { |
| 256 | int sleep = 1; |
| 257 | exception_ptr lastError; |
| 258 | const RpcConfig & conf = key.getConf(); |
| 259 | const RpcServerInfo & server = key.getServer(); |
| 260 | std::string buffer; |
| 261 | |
| 262 | for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) { |
| 263 | RpcAuth auth = key.getAuth(); |
| 264 | |
| 265 | if (key.hasToken()) { |
| 266 | auth.setMethod(AuthMethod::TOKEN); |
| 267 | } |
| 268 | |
| 269 | try { |
| 270 | while (true) { |
| 271 | sock->connect(server.getHost().c_str(), server.getPort().c_str(), |
| 272 | conf.getConnectTimeout()); |
| 273 | sock->setNoDelay(conf.isTcpNoDelay()); |
| 274 | sendConnectionHeader(auth); |
| 275 | |
| 276 | if (auth.getProtocol() == AuthProtocol::SASL) { |
| 277 | auth = setupSaslConnection(); |
| 278 | |
| 279 | if (auth.getProtocol() == AuthProtocol::SASL) { |
| 280 | //success |
| 281 | break; |
| 282 | } |
| 283 | |
| 284 | /* |
| 285 | * switch to other auth protocol |
| 286 | */ |
| 287 | sock->close(); |
| 288 | CheckOperationCanceled(); |
| 289 | } else { |
| 290 | break; |
| 291 | } |
| 292 | } |
| 293 | |
| 294 | auth.setUser(key.getAuth().getUser()); |
| 295 | sendConnectionContent(auth); |
| 296 | available = true; |
| 297 | lastActivity = lastIdle = steady_clock::now(); |
| 298 | return; |
| 299 | } catch (const SaslException & e) { |
| 300 | /* |
| 301 | * Namenode may treat this connect as replay, retry later |
| 302 | */ |
| 303 | sleep = (rand() % 5) + 1; |
| 304 | lastError = current_exception(); |
| 305 | LOG(LOG_ERROR, |
| 306 | "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s" , |
| 307 | server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); |
| 308 | } catch (const HdfsNetworkException & e) { |
| 309 | sleep = 1; |
| 310 | lastError = current_exception(); |
| 311 | LOG(LOG_ERROR, |
| 312 | "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s" , |
| 313 | server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); |
| 314 | } catch (const HdfsTimeoutException & e) { |
| 315 | sleep = 1; |
| 316 | lastError = current_exception(); |
| 317 | LOG(LOG_ERROR, |
| 318 | "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s" , |
| 319 | server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); |
| 320 | } |
| 321 | |
| 322 | if (i + 1 < conf.getMaxRetryOnConnect()) { |
| 323 | LOG(INFO, |
| 324 | "Retrying connect to server: \"%s:%s\". Already tried %d time(s)" , |
| 325 | server.getHost().c_str(), server.getPort().c_str(), i + 1); |
| 326 | } |
| 327 | |
| 328 | sock->close(); |
| 329 | CheckOperationCanceled(); |
| 330 | sleep_for(seconds(sleep)); |
| 331 | } |
| 332 | |
| 333 | rethrow_exception(lastError); |
| 334 | } |
| 335 | |
| 336 | exception_ptr RpcChannelImpl::invokeInternal(RpcRemoteCallPtr remote) { |
| 337 | const RpcCall & call = remote->getCall(); |
| 338 | exception_ptr lastError; |
| 339 | |
| 340 | try { |
| 341 | if (client.isRunning()) { |
| 342 | lock_guard<mutex> lock(writeMut); |
| 343 | |
| 344 | if (!available) { |
| 345 | connect(); |
| 346 | } |
| 347 | |
| 348 | sendRequest(remote); |
| 349 | } |
| 350 | |
| 351 | /* |
| 352 | * We use one call thread to check response, |
| 353 | * other thread will wait on RPC call complete. |
| 354 | */ |
| 355 | while (client.isRunning()) { |
| 356 | if (remote->finished()) { |
| 357 | /* |
| 358 | * Current RPC call has finished. |
| 359 | * Wake up another thread to check response. |
| 360 | */ |
| 361 | wakeupOneCaller(remote->getIdentity()); |
| 362 | break; |
| 363 | } |
| 364 | |
| 365 | unique_lock<mutex> lock(readMut, defer_lock_t()); |
| 366 | |
| 367 | if (lock.try_lock()) { |
| 368 | /* |
| 369 | * Current thread will check response. |
| 370 | */ |
| 371 | checkOneResponse(); |
| 372 | } else { |
| 373 | /* |
| 374 | * Another thread checks response, just wait. |
| 375 | */ |
| 376 | remote->wait(); |
| 377 | } |
| 378 | } |
| 379 | } catch (const HdfsNetworkConnectException & e) { |
| 380 | try { |
| 381 | NESTED_THROW(HdfsFailoverException, |
| 382 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
| 383 | call.getName(), key.getServer().getHost().c_str(), |
| 384 | key.getServer().getPort().c_str()); |
| 385 | } catch (const HdfsFailoverException & e) { |
| 386 | lastError = current_exception(); |
| 387 | } |
| 388 | } catch (const HdfsNetworkException & e) { |
| 389 | try { |
| 390 | NESTED_THROW(HdfsRpcException, |
| 391 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
| 392 | call.getName(), key.getServer().getHost().c_str(), |
| 393 | key.getServer().getPort().c_str()); |
| 394 | } catch (const HdfsRpcException & e) { |
| 395 | lastError = current_exception(); |
| 396 | } |
| 397 | } catch (const HdfsTimeoutException & e) { |
| 398 | try { |
| 399 | NESTED_THROW(HdfsFailoverException, |
| 400 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
| 401 | call.getName(), key.getServer().getHost().c_str(), |
| 402 | key.getServer().getPort().c_str()); |
| 403 | } catch (const HdfsFailoverException & e) { |
| 404 | lastError = current_exception(); |
| 405 | } |
| 406 | } catch (const HdfsRpcException & e) { |
| 407 | lastError = current_exception(); |
| 408 | } catch (const HdfsIOException & e) { |
| 409 | try { |
| 410 | NESTED_THROW(HdfsRpcException, |
| 411 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
| 412 | call.getName(), key.getServer().getHost().c_str(), |
| 413 | key.getServer().getPort().c_str()); |
| 414 | } catch (const HdfsRpcException & e) { |
| 415 | lastError = current_exception(); |
| 416 | } |
| 417 | } |
| 418 | |
| 419 | return lastError; |
| 420 | } |
| 421 | |
| 422 | void RpcChannelImpl::invoke(const RpcCall & call) { |
| 423 | assert(refs > 0); |
| 424 | RpcRemoteCallPtr remote; |
| 425 | exception_ptr lastError; |
| 426 | |
| 427 | try { |
| 428 | bool retry = false; |
| 429 | |
| 430 | do { |
| 431 | int32_t id = client.getCallId(); |
| 432 | remote = RpcRemoteCallPtr(new RpcRemoteCall(call, id, client.getClientId())); |
| 433 | lastError = exception_ptr(); |
| 434 | lastError = invokeInternal(remote); |
| 435 | |
| 436 | if (lastError) { |
| 437 | lock_guard<mutex> lock(writeMut); |
| 438 | shutdown(lastError); |
| 439 | |
| 440 | if (!retry && call.isIdempotent()) { |
| 441 | retry = true; |
| 442 | std::string buffer; |
| 443 | LOG(LOG_ERROR, |
| 444 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\": \n%s" , |
| 445 | call.getName(), key.getServer().getHost().c_str(), |
| 446 | key.getServer().getPort().c_str(), |
| 447 | GetExceptionDetail(lastError, buffer)); |
| 448 | LOG(INFO, |
| 449 | "Retry idempotent RPC call \"%s\" on server \"%s:%s\"" , |
| 450 | call.getName(), key.getServer().getHost().c_str(), |
| 451 | key.getServer().getPort().c_str()); |
| 452 | } else { |
| 453 | rethrow_exception(lastError); |
| 454 | } |
| 455 | } else { |
| 456 | break; |
| 457 | } |
| 458 | } while (retry); |
| 459 | } catch (const HdfsRpcServerException & e) { |
| 460 | if (!remote->finished()) { |
| 461 | /* |
| 462 | * a fatal error happened, the caller will unwrap it. |
| 463 | */ |
| 464 | lock_guard<mutex> lock(writeMut); |
| 465 | lastError = current_exception(); |
| 466 | shutdown(lastError); |
| 467 | } |
| 468 | |
| 469 | /* |
| 470 | * else not a fatal error, check again at the end of this function. |
| 471 | */ |
| 472 | } catch (const HdfsException & e) { |
| 473 | lock_guard<mutex> lock(writeMut); |
| 474 | lastError = current_exception(); |
| 475 | shutdown(lastError); |
| 476 | } |
| 477 | |
| 478 | /* |
| 479 | * if the call is not finished, either failed to setup connection, |
| 480 | * or client is closing. |
| 481 | */ |
| 482 | if (!remote->finished() || !client.isRunning()) { |
| 483 | lock_guard<mutex> lock(writeMut); |
| 484 | |
| 485 | if (lastError == exception_ptr()) { |
| 486 | try { |
| 487 | THROW(Hdfs::HdfsRpcException, |
| 488 | "Failed to invoke RPC call \"%s\", RPC channel to \"%s:%s\" is to be closed since RpcClient is closing" , |
| 489 | call.getName(), key.getServer().getHost().c_str(), key.getServer().getPort().c_str()); |
| 490 | } catch (...) { |
| 491 | lastError = current_exception(); |
| 492 | } |
| 493 | } |
| 494 | |
| 495 | /* |
| 496 | * wake up all. |
| 497 | */ |
| 498 | shutdown(lastError); |
| 499 | rethrow_exception(lastError); |
| 500 | } |
| 501 | |
| 502 | remote->check(); |
| 503 | } |
| 504 | |
| 505 | void RpcChannelImpl::shutdown(exception_ptr reason) { |
| 506 | assert(reason != exception_ptr()); |
| 507 | available = false; |
| 508 | cleanupPendingCalls(reason); |
| 509 | sock->close(); |
| 510 | } |
| 511 | |
| 512 | void RpcChannelImpl::wakeupOneCaller(int32_t id) { |
| 513 | lock_guard<mutex> lock(writeMut); |
| 514 | unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e; |
| 515 | e = pendingCalls.end(); |
| 516 | |
| 517 | for (s = pendingCalls.begin(); s != e; ++s) { |
| 518 | if (s->first != id) { |
| 519 | s->second->wakeup(); |
| 520 | return; |
| 521 | } |
| 522 | } |
| 523 | } |
| 524 | |
| 525 | void RpcChannelImpl::sendRequest(RpcRemoteCallPtr remote) { |
| 526 | WriteBuffer buffer; |
| 527 | assert(true == available); |
| 528 | remote->serialize(key.getProtocol(), buffer); |
| 529 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), |
| 530 | key.getConf().getWriteTimeout()); |
| 531 | uint32_t id = remote->getIdentity(); |
| 532 | pendingCalls[id] = remote; |
| 533 | lastActivity = lastIdle = steady_clock::now(); |
| 534 | } |
| 535 | |
| 536 | void RpcChannelImpl::cleanupPendingCalls(exception_ptr reason) { |
| 537 | assert(!writeMut.try_lock()); |
| 538 | unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e; |
| 539 | e = pendingCalls.end(); |
| 540 | |
| 541 | for (s = pendingCalls.begin(); s != e; ++s) { |
| 542 | s->second->cancel(reason); |
| 543 | } |
| 544 | |
| 545 | pendingCalls.clear(); |
| 546 | } |
| 547 | |
| 548 | void RpcChannelImpl::checkOneResponse() { |
| 549 | int ping = key.getConf().getPingTimeout(); |
| 550 | int timeout = key.getConf().getRpcTimeout(); |
| 551 | steady_clock::time_point start = steady_clock::now(); |
| 552 | |
| 553 | while (client.isRunning()) { |
| 554 | if (getResponse()) { |
| 555 | readOneResponse(true); |
| 556 | return; |
| 557 | } else { |
| 558 | if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) { |
| 559 | lock_guard<mutex> lock(writeMut); |
| 560 | sendPing(); |
| 561 | } |
| 562 | } |
| 563 | |
| 564 | if (timeout > 0 && ToMilliSeconds(start, steady_clock::now()) >= timeout) { |
| 565 | try { |
| 566 | THROW(Hdfs::HdfsTimeoutException, "Timeout when wait for response from RPC channel \"%s:%s\"" , |
| 567 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()); |
| 568 | } catch (...) { |
| 569 | NESTED_THROW(Hdfs::HdfsRpcException, "Timeout when wait for response from RPC channel \"%s:%s\"" , |
| 570 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()); |
| 571 | } |
| 572 | } |
| 573 | } |
| 574 | } |
| 575 | |
| 576 | void RpcChannelImpl::sendPing() { |
| 577 | static const std::vector<char> pingRequest = RpcRemoteCall::GetPingRequest(client.getClientId()); |
| 578 | |
| 579 | if (available) { |
| 580 | LOG(INFO, |
| 581 | "RPC channel to \"%s:%s\" got no response or idle for %d milliseconds, sending ping." , |
| 582 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), key.getConf().getPingTimeout()); |
| 583 | sock->writeFully(&pingRequest[0], pingRequest.size(), key.getConf().getWriteTimeout()); |
| 584 | lastActivity = steady_clock::now(); |
| 585 | } |
| 586 | } |
| 587 | |
| 588 | bool RpcChannelImpl::checkIdle() { |
| 589 | unique_lock<mutex> lock(writeMut, defer_lock_t()); |
| 590 | |
| 591 | if (lock.try_lock()) { |
| 592 | if (!pendingCalls.empty() || refs > 0) { |
| 593 | lastIdle = steady_clock::now(); |
| 594 | return false; |
| 595 | } |
| 596 | |
| 597 | int idle = key.getConf().getMaxIdleTime(); |
| 598 | int ping = key.getConf().getPingTimeout(); |
| 599 | |
| 600 | try { |
| 601 | //close the connection if idle timeout |
| 602 | if (ToMilliSeconds(lastIdle, steady_clock::now()) >= idle) { |
| 603 | sock->close(); |
| 604 | return true; |
| 605 | } |
| 606 | |
| 607 | //send ping |
| 608 | if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) { |
| 609 | sendPing(); |
| 610 | } |
| 611 | } catch (...) { |
| 612 | std::string buffer; |
| 613 | LOG(LOG_ERROR, |
| 614 | "Failed to send ping via idle RPC channel to server \"%s:%s\": " |
| 615 | "\n%s" , |
| 616 | key.getServer().getHost().c_str(), |
| 617 | key.getServer().getPort().c_str(), |
| 618 | GetExceptionDetail(current_exception(), buffer)); |
| 619 | sock->close(); |
| 620 | return true; |
| 621 | } |
| 622 | } |
| 623 | |
| 624 | return false; |
| 625 | } |
| 626 | |
| 627 | void RpcChannelImpl::waitForExit() { |
| 628 | assert(!client.isRunning()); |
| 629 | |
| 630 | while (refs != 0) { |
| 631 | sleep_for(milliseconds(100)); |
| 632 | } |
| 633 | |
| 634 | assert(pendingCalls.empty()); |
| 635 | } |
| 636 | |
| 637 | /** |
| 638 | * Write the connection header - this is sent when connection is established |
| 639 | * +----------------------------------+ |
| 640 | * | "hrpc" 4 bytes | |
| 641 | * +----------------------------------+ |
| 642 | * | Version (1 byte) | |
| 643 | * +----------------------------------+ |
| 644 | * | Service Class (1 byte) | |
| 645 | * +----------------------------------+ |
| 646 | * | AuthProtocol (1 byte) | |
| 647 | * +----------------------------------+ |
| 648 | */ |
| 649 | void RpcChannelImpl::(const RpcAuth &auth) { |
| 650 | WriteBuffer buffer; |
| 651 | buffer.write(RPC_HEADER_MAGIC, strlen(RPC_HEADER_MAGIC)); |
| 652 | buffer.write(static_cast<char>(RPC_HEADER_VERSION)); |
| 653 | buffer.write(static_cast<char>(0)); //for future feature |
| 654 | buffer.write(static_cast<char>(auth.getProtocol())); |
| 655 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), |
| 656 | key.getConf().getWriteTimeout()); |
| 657 | } |
| 658 | |
| 659 | void RpcChannelImpl::buildConnectionContext( |
| 660 | IpcConnectionContextProto & connectionContext, const RpcAuth & auth) { |
| 661 | connectionContext.set_protocol(key.getProtocol().getProtocol()); |
| 662 | std::string euser = key.getAuth().getUser().getPrincipal(); |
| 663 | std::string ruser = key.getAuth().getUser().getRealUser(); |
| 664 | |
| 665 | if (auth.getMethod() != AuthMethod::TOKEN) { |
| 666 | UserInformationProto * user = connectionContext.mutable_userinfo(); |
| 667 | user->set_effectiveuser(euser); |
| 668 | |
| 669 | if (auth.getMethod() != AuthMethod::SIMPLE) { |
| 670 | if (!ruser.empty() && ruser != euser) { |
| 671 | user->set_realuser(ruser); |
| 672 | } |
| 673 | } |
| 674 | } |
| 675 | } |
| 676 | |
| 677 | void RpcChannelImpl::sendConnectionContent(const RpcAuth & auth) { |
| 678 | WriteBuffer buffer; |
| 679 | IpcConnectionContextProto connectionContext; |
| 680 | RpcRequestHeaderProto ; |
| 681 | buildConnectionContext(connectionContext, auth); |
| 682 | rpcHeader.set_callid(CONNECTION_CONTEXT_CALL_ID); |
| 683 | rpcHeader.set_clientid(client.getClientId()); |
| 684 | rpcHeader.set_retrycount(INVALID_RETRY_COUNT); |
| 685 | rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER); |
| 686 | rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET); |
| 687 | RpcContentWrapper wrapper(&rpcHeader, &connectionContext); |
| 688 | int size = wrapper.getLength(); |
| 689 | buffer.writeBigEndian(size); |
| 690 | wrapper.writeTo(buffer); |
| 691 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), |
| 692 | key.getConf().getWriteTimeout()); |
| 693 | lastActivity = lastIdle = steady_clock::now(); |
| 694 | } |
| 695 | |
| 696 | RpcRemoteCallPtr RpcChannelImpl::getPendingCall(int32_t id) { |
| 697 | unordered_map<int32_t, RpcRemoteCallPtr>::iterator it; |
| 698 | it = pendingCalls.find(id); |
| 699 | |
| 700 | if (it == pendingCalls.end()) { |
| 701 | THROW(HdfsRpcException, |
| 702 | "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot find pending call: id = %d." , |
| 703 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), static_cast<int>(id)); |
| 704 | } |
| 705 | |
| 706 | RpcRemoteCallPtr rc = it->second; |
| 707 | pendingCalls.erase(it); |
| 708 | return rc; |
| 709 | } |
| 710 | |
| 711 | bool RpcChannelImpl::getResponse() { |
| 712 | int idleTimeout = key.getConf().getMaxIdleTime(); |
| 713 | int pingTimeout = key.getConf().getPingTimeout(); |
| 714 | int timeout = key.getConf().getRpcTimeout(); |
| 715 | int interval = pingTimeout < idleTimeout ? pingTimeout : idleTimeout; |
| 716 | interval /= 2; |
| 717 | interval = interval < timeout ? interval : timeout; |
| 718 | steady_clock::time_point s = steady_clock::now(); |
| 719 | |
| 720 | while (client.isRunning()) { |
| 721 | if (in->poll(500)) { |
| 722 | return true; |
| 723 | } |
| 724 | |
| 725 | if (ToMilliSeconds(s, steady_clock::now()) >= interval) { |
| 726 | return false; |
| 727 | } |
| 728 | } |
| 729 | |
| 730 | return false; |
| 731 | } |
| 732 | |
| 733 | static exception_ptr HandlerRpcResponseException(exception_ptr e) { |
| 734 | exception_ptr retval = e; |
| 735 | |
| 736 | try { |
| 737 | rethrow_exception(e); |
| 738 | } catch (const HdfsRpcServerException & e) { |
| 739 | UnWrapper < NameNodeStandbyException, RpcNoSuchMethodException, UnsupportedOperationException, |
| 740 | AccessControlException, SafeModeException, SaslException > unwrapper(e); |
| 741 | |
| 742 | try { |
| 743 | unwrapper.unwrap(__FILE__, __LINE__); |
| 744 | } catch (const NameNodeStandbyException & e) { |
| 745 | retval = current_exception(); |
| 746 | } catch (const UnsupportedOperationException & e) { |
| 747 | retval = current_exception(); |
| 748 | } catch (const AccessControlException & e) { |
| 749 | retval = current_exception(); |
| 750 | } catch (const SafeModeException & e) { |
| 751 | retval = current_exception(); |
| 752 | } catch (const SaslException & e) { |
| 753 | retval = current_exception(); |
| 754 | } catch (const RpcNoSuchMethodException & e) { |
| 755 | retval = current_exception(); |
| 756 | } catch (const HdfsIOException & e) { |
| 757 | } |
| 758 | } |
| 759 | |
| 760 | return retval; |
| 761 | } |
| 762 | |
| 763 | void RpcChannelImpl::readOneResponse(bool writeLock) { |
| 764 | int readTimeout = key.getConf().getReadTimeout(); |
| 765 | std::vector<char> buffer(128); |
| 766 | RpcResponseHeaderProto ; |
| 767 | RpcResponseHeaderProto::RpcStatusProto status; |
| 768 | uint32_t = 0, bodySize = 0; |
| 769 | in->readBigEndianInt32(readTimeout); |
| 770 | /* |
| 771 | * read response header |
| 772 | */ |
| 773 | headerSize = in->readVarint32(readTimeout); |
| 774 | buffer.resize(headerSize); |
| 775 | in->readFully(&buffer[0], headerSize, readTimeout); |
| 776 | |
| 777 | if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) { |
| 778 | THROW(HdfsRpcException, |
| 779 | "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header." , |
| 780 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()) |
| 781 | } |
| 782 | |
| 783 | lastActivity = steady_clock::now(); |
| 784 | status = curRespHeader.status(); |
| 785 | |
| 786 | if (RpcResponseHeaderProto_RpcStatusProto_SUCCESS == status) { |
| 787 | /* |
| 788 | * on success, read response body |
| 789 | */ |
| 790 | RpcRemoteCallPtr rc; |
| 791 | |
| 792 | if (writeLock) { |
| 793 | lock_guard<mutex> lock(writeMut); |
| 794 | rc = getPendingCall(curRespHeader.callid()); |
| 795 | } else { |
| 796 | rc = getPendingCall(curRespHeader.callid()); |
| 797 | } |
| 798 | |
| 799 | bodySize = in->readVarint32(readTimeout); |
| 800 | buffer.resize(bodySize); |
| 801 | |
| 802 | if (bodySize > 0) { |
| 803 | in->readFully(&buffer[0], bodySize, readTimeout); |
| 804 | } |
| 805 | |
| 806 | Message * response = rc->getCall().getResponse(); |
| 807 | |
| 808 | if (!response->ParseFromArray(&buffer[0], bodySize)) { |
| 809 | THROW(HdfsRpcException, |
| 810 | "RPC channel to \"%s:%s\" got protocol mismatch: rpc channel cannot parse response." , |
| 811 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()) |
| 812 | } |
| 813 | |
| 814 | rc->done(); |
| 815 | } else { |
| 816 | /* |
| 817 | * on error, read error class and message |
| 818 | */ |
| 819 | std::string errClass, errMessage; |
| 820 | errClass = curRespHeader.exceptionclassname(); |
| 821 | errMessage = curRespHeader.errormsg(); |
| 822 | |
| 823 | if (RpcResponseHeaderProto_RpcStatusProto_ERROR == status) { |
| 824 | RpcRemoteCallPtr rc; |
| 825 | { |
| 826 | lock_guard<mutex> lock(writeMut); |
| 827 | rc = getPendingCall(curRespHeader.callid()); |
| 828 | } |
| 829 | |
| 830 | try { |
| 831 | THROW(HdfsRpcServerException, "%s: %s" , |
| 832 | errClass.c_str(), errMessage.c_str()); |
| 833 | } catch (HdfsRpcServerException & e) { |
| 834 | e.setErrClass(errClass); |
| 835 | e.setErrMsg(errMessage); |
| 836 | rc->cancel(HandlerRpcResponseException(current_exception())); |
| 837 | } |
| 838 | } else { /*fatal*/ |
| 839 | assert(RpcResponseHeaderProto_RpcStatusProto_FATAL == status); |
| 840 | |
| 841 | if (errClass.empty()) { |
| 842 | THROW(HdfsRpcException, "%s: %s" , |
| 843 | errClass.c_str(), errMessage.c_str()); |
| 844 | } |
| 845 | |
| 846 | try { |
| 847 | THROW(HdfsRpcServerException, "%s: %s" , errClass.c_str(), |
| 848 | errMessage.c_str()); |
| 849 | } catch (HdfsRpcServerException & e) { |
| 850 | e.setErrClass(errClass); |
| 851 | e.setErrMsg(errMessage); |
| 852 | rethrow_exception(HandlerRpcResponseException(current_exception())); |
| 853 | } |
| 854 | } |
| 855 | } |
| 856 | } |
| 857 | |
| 858 | } |
| 859 | } |
| 860 | |