1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Exception.h"
29#include "ExceptionInternal.h"
30#include "IpcConnectionContext.pb.h"
31#include "Logger.h"
32#include "RpcChannel.h"
33#include "RpcClient.h"
34#include "RpcContentWrapper.h"
35#include "RpcHeader.pb.h"
36#include "RpcHeader.pb.h"
37#include "server/RpcHelper.h"
38#include "Thread.h"
39#include "WriteBuffer.h"
40
41#include <google/protobuf/io/coded_stream.h>
42
43#define RPC_HEADER_MAGIC "hrpc"
44#define RPC_HEADER_VERSION 9
45#define SERIALIZATION_TYPE_PROTOBUF 0
46#define CONNECTION_CONTEXT_CALL_ID -3
47
48using namespace ::google::protobuf;
49using namespace google::protobuf::io;
50
51namespace Hdfs {
52namespace Internal {
53
54RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, RpcClient & c) :
55 refs(0), available(false), key(k), client(c) {
56 sock = shared_ptr<Socket>(new TcpSocketImpl);
57 sock->setLingerTimeout(k.getConf().getLingerTimeout());
58 in = shared_ptr<BufferedSocketReader>(
59 new BufferedSocketReaderImpl(
60 *static_cast<TcpSocketImpl *>(sock.get())));
61 lastActivity = lastIdle = steady_clock::now();
62}
63
64RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, Socket * s,
65 BufferedSocketReader * in, RpcClient & c) :
66 refs(0), available(false), key(k), client(c) {
67 sock = shared_ptr<Socket>(s);
68 this->in = shared_ptr<BufferedSocketReader>(in);
69 lastActivity = lastIdle = steady_clock::now();
70}
71
72RpcChannelImpl::~RpcChannelImpl() {
73 assert(pendingCalls.empty());
74 assert(refs == 0);
75
76 if (available) {
77 sock->close();
78 }
79}
80
81void RpcChannelImpl::close(bool immediate) {
82 lock_guard<mutex> lock(writeMut);
83 --refs;
84 assert(refs >= 0);
85
86 if (immediate && !refs) {
87 assert(pendingCalls.empty());
88 available = false;
89 sock->close();
90 }
91}
92
93void RpcChannelImpl::sendSaslMessage(RpcSaslProto * msg, Message * resp) {
94 int totalLen;
95 WriteBuffer buffer;
96 RpcRequestHeaderProto rpcHeader;
97 rpcHeader.set_callid(AuthProtocol::SASL);
98 rpcHeader.set_clientid(client.getClientId());
99 rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
100 rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
101 rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
102 RpcContentWrapper wrapper(&rpcHeader, msg);
103 totalLen = wrapper.getLength();
104 buffer.writeBigEndian(totalLen);
105 wrapper.writeTo(buffer);
106 sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), key.getConf().getWriteTimeout());
107 RpcRemoteCallPtr call(
108 new RpcRemoteCall(RpcCall(false, "sasl message", NULL, resp),
109 AuthProtocol::SASL, client.getClientId()));
110 pendingCalls[AuthProtocol::SASL] = call;
111}
112
113
114const RpcSaslProto_SaslAuth * RpcChannelImpl::createSaslClient(
115 const RepeatedPtrField<RpcSaslProto_SaslAuth> * auths) {
116 const RpcSaslProto_SaslAuth * auth = NULL;
117 Token token;
118
119 for (int i = 0; i < auths->size(); ++i) {
120 auth = &auths->Get(i);
121 RpcAuth method(RpcAuth::ParseMethod(auth->method()));
122
123 if (method.getMethod() == AuthMethod::TOKEN && key.hasToken()) {
124 token = key.getToken();
125 break;
126 } else if (method.getMethod() == AuthMethod::KERBEROS) {
127 break;
128 } else if (method.getMethod() == AuthMethod::SIMPLE) {
129 return auth;
130 } else if (method.getMethod() == AuthMethod::UNKNOWN) {
131 return auth;
132 } else {
133 auth = NULL;
134 }
135 }
136
137 if (!auth) {
138 std::stringstream ss;
139 ss.imbue(std::locale::classic());
140 ss << "Client cannot authenticate via: ";
141
142 for (int i = 0; i < auths->size(); ++i) {
143 auth = &auths->Get(i);
144 ss << auth->mechanism() << ", ";
145 }
146
147 THROW(AccessControlException, "%s", ss.str().c_str());
148 }
149
150 saslClient = shared_ptr<SaslClient>(
151 new SaslClient(*auth, token, key.getAuth().getUser().getPrincipal()));
152 return auth;
153}
154
155std::string RpcChannelImpl::saslEvaluateToken(RpcSaslProto & response, bool serverIsDone) {
156 std::string token;
157
158 if (response.has_token()) {
159 token = saslClient->evaluateChallenge(response.token());
160 } else if (!serverIsDone) {
161 THROW(AccessControlException, "Server challenge contains no token");
162 }
163
164 if (serverIsDone) {
165 if (!saslClient->isComplete()) {
166 THROW(AccessControlException, "Client is out of sync with server");
167 }
168
169 if (!token.empty()) {
170 THROW(AccessControlException, "Client generated spurious response");
171 }
172 }
173
174 return token;
175}
176
177RpcAuth RpcChannelImpl::setupSaslConnection() {
178 RpcAuth retval;
179 RpcSaslProto negotiateRequest, response, msg;
180 negotiateRequest.set_state(RpcSaslProto_SaslState_NEGOTIATE);
181 sendSaslMessage(&negotiateRequest, &response);
182 bool done = false;
183
184 do {
185 readOneResponse(false);
186 msg.Clear();
187
188 switch (response.state()) {
189 case RpcSaslProto_SaslState_NEGOTIATE: {
190 const RpcSaslProto_SaslAuth * auth = createSaslClient(
191 &response.auths());
192 retval = RpcAuth(RpcAuth::ParseMethod(auth->method()));
193
194 if (retval.getMethod() == AuthMethod::SIMPLE) {
195 done = true;
196 } else if (retval.getMethod() == AuthMethod::UNKNOWN) {
197 THROW(AccessControlException, "Unknown auth mechanism");
198 } else {
199 std::string respToken;
200 RpcSaslProto_SaslAuth * respAuth = msg.add_auths();
201 respAuth->CopyFrom(*auth);
202 std::string chanllege;
203
204 if (auth->has_challenge()) {
205 chanllege = auth->challenge();
206 respAuth->clear_challenge();
207 }
208
209 respToken = saslClient->evaluateChallenge(chanllege);
210
211 if (!respToken.empty()) {
212 msg.set_token(respToken);
213 }
214
215 msg.set_state(RpcSaslProto_SaslState_INITIATE);
216 }
217
218 break;
219 }
220
221 case RpcSaslProto_SaslState_CHALLENGE: {
222 if (!saslClient) {
223 THROW(AccessControlException, "Server sent unsolicited challenge");
224 }
225
226 std::string token = saslEvaluateToken(response, false);
227 msg.set_token(token);
228 msg.set_state(RpcSaslProto_SaslState_RESPONSE);
229 break;
230 }
231
232 case RpcSaslProto_SaslState_SUCCESS:
233 if (!saslClient) {
234 retval = RpcAuth(AuthMethod::SIMPLE);
235 } else {
236 saslEvaluateToken(response, true);
237 }
238
239 done = true;
240 break;
241
242 default:
243 break;
244 }
245
246 if (!done) {
247 response.Clear();
248 sendSaslMessage(&msg, &response);
249 }
250 } while (!done);
251
252 return retval;
253}
254
255void RpcChannelImpl::connect() {
256 int sleep = 1;
257 exception_ptr lastError;
258 const RpcConfig & conf = key.getConf();
259 const RpcServerInfo & server = key.getServer();
260 std::string buffer;
261
262 for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) {
263 RpcAuth auth = key.getAuth();
264
265 if (key.hasToken()) {
266 auth.setMethod(AuthMethod::TOKEN);
267 }
268
269 try {
270 while (true) {
271 sock->connect(server.getHost().c_str(), server.getPort().c_str(),
272 conf.getConnectTimeout());
273 sock->setNoDelay(conf.isTcpNoDelay());
274 sendConnectionHeader(auth);
275
276 if (auth.getProtocol() == AuthProtocol::SASL) {
277 auth = setupSaslConnection();
278
279 if (auth.getProtocol() == AuthProtocol::SASL) {
280 //success
281 break;
282 }
283
284 /*
285 * switch to other auth protocol
286 */
287 sock->close();
288 CheckOperationCanceled();
289 } else {
290 break;
291 }
292 }
293
294 auth.setUser(key.getAuth().getUser());
295 sendConnectionContent(auth);
296 available = true;
297 lastActivity = lastIdle = steady_clock::now();
298 return;
299 } catch (const SaslException & e) {
300 /*
301 * Namenode may treat this connect as replay, retry later
302 */
303 sleep = (rand() % 5) + 1;
304 lastError = current_exception();
305 LOG(LOG_ERROR,
306 "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
307 server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer));
308 } catch (const HdfsNetworkException & e) {
309 sleep = 1;
310 lastError = current_exception();
311 LOG(LOG_ERROR,
312 "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
313 server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer));
314 } catch (const HdfsTimeoutException & e) {
315 sleep = 1;
316 lastError = current_exception();
317 LOG(LOG_ERROR,
318 "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
319 server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer));
320 }
321
322 if (i + 1 < conf.getMaxRetryOnConnect()) {
323 LOG(INFO,
324 "Retrying connect to server: \"%s:%s\". Already tried %d time(s)",
325 server.getHost().c_str(), server.getPort().c_str(), i + 1);
326 }
327
328 sock->close();
329 CheckOperationCanceled();
330 sleep_for(seconds(sleep));
331 }
332
333 rethrow_exception(lastError);
334}
335
336exception_ptr RpcChannelImpl::invokeInternal(RpcRemoteCallPtr remote) {
337 const RpcCall & call = remote->getCall();
338 exception_ptr lastError;
339
340 try {
341 if (client.isRunning()) {
342 lock_guard<mutex> lock(writeMut);
343
344 if (!available) {
345 connect();
346 }
347
348 sendRequest(remote);
349 }
350
351 /*
352 * We use one call thread to check response,
353 * other thread will wait on RPC call complete.
354 */
355 while (client.isRunning()) {
356 if (remote->finished()) {
357 /*
358 * Current RPC call has finished.
359 * Wake up another thread to check response.
360 */
361 wakeupOneCaller(remote->getIdentity());
362 break;
363 }
364
365 unique_lock<mutex> lock(readMut, defer_lock_t());
366
367 if (lock.try_lock()) {
368 /*
369 * Current thread will check response.
370 */
371 checkOneResponse();
372 } else {
373 /*
374 * Another thread checks response, just wait.
375 */
376 remote->wait();
377 }
378 }
379 } catch (const HdfsNetworkConnectException & e) {
380 try {
381 NESTED_THROW(HdfsFailoverException,
382 "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
383 call.getName(), key.getServer().getHost().c_str(),
384 key.getServer().getPort().c_str());
385 } catch (const HdfsFailoverException & e) {
386 lastError = current_exception();
387 }
388 } catch (const HdfsNetworkException & e) {
389 try {
390 NESTED_THROW(HdfsRpcException,
391 "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
392 call.getName(), key.getServer().getHost().c_str(),
393 key.getServer().getPort().c_str());
394 } catch (const HdfsRpcException & e) {
395 lastError = current_exception();
396 }
397 } catch (const HdfsTimeoutException & e) {
398 try {
399 NESTED_THROW(HdfsFailoverException,
400 "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
401 call.getName(), key.getServer().getHost().c_str(),
402 key.getServer().getPort().c_str());
403 } catch (const HdfsFailoverException & e) {
404 lastError = current_exception();
405 }
406 } catch (const HdfsRpcException & e) {
407 lastError = current_exception();
408 } catch (const HdfsIOException & e) {
409 try {
410 NESTED_THROW(HdfsRpcException,
411 "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
412 call.getName(), key.getServer().getHost().c_str(),
413 key.getServer().getPort().c_str());
414 } catch (const HdfsRpcException & e) {
415 lastError = current_exception();
416 }
417 }
418
419 return lastError;
420}
421
422void RpcChannelImpl::invoke(const RpcCall & call) {
423 assert(refs > 0);
424 RpcRemoteCallPtr remote;
425 exception_ptr lastError;
426
427 try {
428 bool retry = false;
429
430 do {
431 int32_t id = client.getCallId();
432 remote = RpcRemoteCallPtr(new RpcRemoteCall(call, id, client.getClientId()));
433 lastError = exception_ptr();
434 lastError = invokeInternal(remote);
435
436 if (lastError) {
437 lock_guard<mutex> lock(writeMut);
438 shutdown(lastError);
439
440 if (!retry && call.isIdempotent()) {
441 retry = true;
442 std::string buffer;
443 LOG(LOG_ERROR,
444 "Failed to invoke RPC call \"%s\" on server \"%s:%s\": \n%s",
445 call.getName(), key.getServer().getHost().c_str(),
446 key.getServer().getPort().c_str(),
447 GetExceptionDetail(lastError, buffer));
448 LOG(INFO,
449 "Retry idempotent RPC call \"%s\" on server \"%s:%s\"",
450 call.getName(), key.getServer().getHost().c_str(),
451 key.getServer().getPort().c_str());
452 } else {
453 rethrow_exception(lastError);
454 }
455 } else {
456 break;
457 }
458 } while (retry);
459 } catch (const HdfsRpcServerException & e) {
460 if (!remote->finished()) {
461 /*
462 * a fatal error happened, the caller will unwrap it.
463 */
464 lock_guard<mutex> lock(writeMut);
465 lastError = current_exception();
466 shutdown(lastError);
467 }
468
469 /*
470 * else not a fatal error, check again at the end of this function.
471 */
472 } catch (const HdfsException & e) {
473 lock_guard<mutex> lock(writeMut);
474 lastError = current_exception();
475 shutdown(lastError);
476 }
477
478 /*
479 * if the call is not finished, either failed to setup connection,
480 * or client is closing.
481 */
482 if (!remote->finished() || !client.isRunning()) {
483 lock_guard<mutex> lock(writeMut);
484
485 if (lastError == exception_ptr()) {
486 try {
487 THROW(Hdfs::HdfsRpcException,
488 "Failed to invoke RPC call \"%s\", RPC channel to \"%s:%s\" is to be closed since RpcClient is closing",
489 call.getName(), key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
490 } catch (...) {
491 lastError = current_exception();
492 }
493 }
494
495 /*
496 * wake up all.
497 */
498 shutdown(lastError);
499 rethrow_exception(lastError);
500 }
501
502 remote->check();
503}
504
505void RpcChannelImpl::shutdown(exception_ptr reason) {
506 assert(reason != exception_ptr());
507 available = false;
508 cleanupPendingCalls(reason);
509 sock->close();
510}
511
512void RpcChannelImpl::wakeupOneCaller(int32_t id) {
513 lock_guard<mutex> lock(writeMut);
514 unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
515 e = pendingCalls.end();
516
517 for (s = pendingCalls.begin(); s != e; ++s) {
518 if (s->first != id) {
519 s->second->wakeup();
520 return;
521 }
522 }
523}
524
525void RpcChannelImpl::sendRequest(RpcRemoteCallPtr remote) {
526 WriteBuffer buffer;
527 assert(true == available);
528 remote->serialize(key.getProtocol(), buffer);
529 sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
530 key.getConf().getWriteTimeout());
531 uint32_t id = remote->getIdentity();
532 pendingCalls[id] = remote;
533 lastActivity = lastIdle = steady_clock::now();
534}
535
536void RpcChannelImpl::cleanupPendingCalls(exception_ptr reason) {
537 assert(!writeMut.try_lock());
538 unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
539 e = pendingCalls.end();
540
541 for (s = pendingCalls.begin(); s != e; ++s) {
542 s->second->cancel(reason);
543 }
544
545 pendingCalls.clear();
546}
547
548void RpcChannelImpl::checkOneResponse() {
549 int ping = key.getConf().getPingTimeout();
550 int timeout = key.getConf().getRpcTimeout();
551 steady_clock::time_point start = steady_clock::now();
552
553 while (client.isRunning()) {
554 if (getResponse()) {
555 readOneResponse(true);
556 return;
557 } else {
558 if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) {
559 lock_guard<mutex> lock(writeMut);
560 sendPing();
561 }
562 }
563
564 if (timeout > 0 && ToMilliSeconds(start, steady_clock::now()) >= timeout) {
565 try {
566 THROW(Hdfs::HdfsTimeoutException, "Timeout when wait for response from RPC channel \"%s:%s\"",
567 key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
568 } catch (...) {
569 NESTED_THROW(Hdfs::HdfsRpcException, "Timeout when wait for response from RPC channel \"%s:%s\"",
570 key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
571 }
572 }
573 }
574}
575
576void RpcChannelImpl::sendPing() {
577 static const std::vector<char> pingRequest = RpcRemoteCall::GetPingRequest(client.getClientId());
578
579 if (available) {
580 LOG(INFO,
581 "RPC channel to \"%s:%s\" got no response or idle for %d milliseconds, sending ping.",
582 key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), key.getConf().getPingTimeout());
583 sock->writeFully(&pingRequest[0], pingRequest.size(), key.getConf().getWriteTimeout());
584 lastActivity = steady_clock::now();
585 }
586}
587
588bool RpcChannelImpl::checkIdle() {
589 unique_lock<mutex> lock(writeMut, defer_lock_t());
590
591 if (lock.try_lock()) {
592 if (!pendingCalls.empty() || refs > 0) {
593 lastIdle = steady_clock::now();
594 return false;
595 }
596
597 int idle = key.getConf().getMaxIdleTime();
598 int ping = key.getConf().getPingTimeout();
599
600 try {
601 //close the connection if idle timeout
602 if (ToMilliSeconds(lastIdle, steady_clock::now()) >= idle) {
603 sock->close();
604 return true;
605 }
606
607 //send ping
608 if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) {
609 sendPing();
610 }
611 } catch (...) {
612 std::string buffer;
613 LOG(LOG_ERROR,
614 "Failed to send ping via idle RPC channel to server \"%s:%s\": "
615 "\n%s",
616 key.getServer().getHost().c_str(),
617 key.getServer().getPort().c_str(),
618 GetExceptionDetail(current_exception(), buffer));
619 sock->close();
620 return true;
621 }
622 }
623
624 return false;
625}
626
627void RpcChannelImpl::waitForExit() {
628 assert(!client.isRunning());
629
630 while (refs != 0) {
631 sleep_for(milliseconds(100));
632 }
633
634 assert(pendingCalls.empty());
635}
636
637/**
638 * Write the connection header - this is sent when connection is established
639 * +----------------------------------+
640 * | "hrpc" 4 bytes |
641 * +----------------------------------+
642 * | Version (1 byte) |
643 * +----------------------------------+
644 * | Service Class (1 byte) |
645 * +----------------------------------+
646 * | AuthProtocol (1 byte) |
647 * +----------------------------------+
648 */
649void RpcChannelImpl::sendConnectionHeader(const RpcAuth &auth) {
650 WriteBuffer buffer;
651 buffer.write(RPC_HEADER_MAGIC, strlen(RPC_HEADER_MAGIC));
652 buffer.write(static_cast<char>(RPC_HEADER_VERSION));
653 buffer.write(static_cast<char>(0)); //for future feature
654 buffer.write(static_cast<char>(auth.getProtocol()));
655 sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
656 key.getConf().getWriteTimeout());
657}
658
659void RpcChannelImpl::buildConnectionContext(
660 IpcConnectionContextProto & connectionContext, const RpcAuth & auth) {
661 connectionContext.set_protocol(key.getProtocol().getProtocol());
662 std::string euser = key.getAuth().getUser().getPrincipal();
663 std::string ruser = key.getAuth().getUser().getRealUser();
664
665 if (auth.getMethod() != AuthMethod::TOKEN) {
666 UserInformationProto * user = connectionContext.mutable_userinfo();
667 user->set_effectiveuser(euser);
668
669 if (auth.getMethod() != AuthMethod::SIMPLE) {
670 if (!ruser.empty() && ruser != euser) {
671 user->set_realuser(ruser);
672 }
673 }
674 }
675}
676
677void RpcChannelImpl::sendConnectionContent(const RpcAuth & auth) {
678 WriteBuffer buffer;
679 IpcConnectionContextProto connectionContext;
680 RpcRequestHeaderProto rpcHeader;
681 buildConnectionContext(connectionContext, auth);
682 rpcHeader.set_callid(CONNECTION_CONTEXT_CALL_ID);
683 rpcHeader.set_clientid(client.getClientId());
684 rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
685 rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
686 rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
687 RpcContentWrapper wrapper(&rpcHeader, &connectionContext);
688 int size = wrapper.getLength();
689 buffer.writeBigEndian(size);
690 wrapper.writeTo(buffer);
691 sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
692 key.getConf().getWriteTimeout());
693 lastActivity = lastIdle = steady_clock::now();
694}
695
696RpcRemoteCallPtr RpcChannelImpl::getPendingCall(int32_t id) {
697 unordered_map<int32_t, RpcRemoteCallPtr>::iterator it;
698 it = pendingCalls.find(id);
699
700 if (it == pendingCalls.end()) {
701 THROW(HdfsRpcException,
702 "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot find pending call: id = %d.",
703 key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), static_cast<int>(id));
704 }
705
706 RpcRemoteCallPtr rc = it->second;
707 pendingCalls.erase(it);
708 return rc;
709}
710
711bool RpcChannelImpl::getResponse() {
712 int idleTimeout = key.getConf().getMaxIdleTime();
713 int pingTimeout = key.getConf().getPingTimeout();
714 int timeout = key.getConf().getRpcTimeout();
715 int interval = pingTimeout < idleTimeout ? pingTimeout : idleTimeout;
716 interval /= 2;
717 interval = interval < timeout ? interval : timeout;
718 steady_clock::time_point s = steady_clock::now();
719
720 while (client.isRunning()) {
721 if (in->poll(500)) {
722 return true;
723 }
724
725 if (ToMilliSeconds(s, steady_clock::now()) >= interval) {
726 return false;
727 }
728 }
729
730 return false;
731}
732
733static exception_ptr HandlerRpcResponseException(exception_ptr e) {
734 exception_ptr retval = e;
735
736 try {
737 rethrow_exception(e);
738 } catch (const HdfsRpcServerException & e) {
739 UnWrapper < NameNodeStandbyException, RpcNoSuchMethodException, UnsupportedOperationException,
740 AccessControlException, SafeModeException, SaslException > unwrapper(e);
741
742 try {
743 unwrapper.unwrap(__FILE__, __LINE__);
744 } catch (const NameNodeStandbyException & e) {
745 retval = current_exception();
746 } catch (const UnsupportedOperationException & e) {
747 retval = current_exception();
748 } catch (const AccessControlException & e) {
749 retval = current_exception();
750 } catch (const SafeModeException & e) {
751 retval = current_exception();
752 } catch (const SaslException & e) {
753 retval = current_exception();
754 } catch (const RpcNoSuchMethodException & e) {
755 retval = current_exception();
756 } catch (const HdfsIOException & e) {
757 }
758 }
759
760 return retval;
761}
762
763void RpcChannelImpl::readOneResponse(bool writeLock) {
764 int readTimeout = key.getConf().getReadTimeout();
765 std::vector<char> buffer(128);
766 RpcResponseHeaderProto curRespHeader;
767 RpcResponseHeaderProto::RpcStatusProto status;
768 uint32_t headerSize = 0, bodySize = 0;
769 in->readBigEndianInt32(readTimeout);
770 /*
771 * read response header
772 */
773 headerSize = in->readVarint32(readTimeout);
774 buffer.resize(headerSize);
775 in->readFully(&buffer[0], headerSize, readTimeout);
776
777 if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
778 THROW(HdfsRpcException,
779 "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header.",
780 key.getServer().getHost().c_str(), key.getServer().getPort().c_str())
781 }
782
783 lastActivity = steady_clock::now();
784 status = curRespHeader.status();
785
786 if (RpcResponseHeaderProto_RpcStatusProto_SUCCESS == status) {
787 /*
788 * on success, read response body
789 */
790 RpcRemoteCallPtr rc;
791
792 if (writeLock) {
793 lock_guard<mutex> lock(writeMut);
794 rc = getPendingCall(curRespHeader.callid());
795 } else {
796 rc = getPendingCall(curRespHeader.callid());
797 }
798
799 bodySize = in->readVarint32(readTimeout);
800 buffer.resize(bodySize);
801
802 if (bodySize > 0) {
803 in->readFully(&buffer[0], bodySize, readTimeout);
804 }
805
806 Message * response = rc->getCall().getResponse();
807
808 if (!response->ParseFromArray(&buffer[0], bodySize)) {
809 THROW(HdfsRpcException,
810 "RPC channel to \"%s:%s\" got protocol mismatch: rpc channel cannot parse response.",
811 key.getServer().getHost().c_str(), key.getServer().getPort().c_str())
812 }
813
814 rc->done();
815 } else {
816 /*
817 * on error, read error class and message
818 */
819 std::string errClass, errMessage;
820 errClass = curRespHeader.exceptionclassname();
821 errMessage = curRespHeader.errormsg();
822
823 if (RpcResponseHeaderProto_RpcStatusProto_ERROR == status) {
824 RpcRemoteCallPtr rc;
825 {
826 lock_guard<mutex> lock(writeMut);
827 rc = getPendingCall(curRespHeader.callid());
828 }
829
830 try {
831 THROW(HdfsRpcServerException, "%s: %s",
832 errClass.c_str(), errMessage.c_str());
833 } catch (HdfsRpcServerException & e) {
834 e.setErrClass(errClass);
835 e.setErrMsg(errMessage);
836 rc->cancel(HandlerRpcResponseException(current_exception()));
837 }
838 } else { /*fatal*/
839 assert(RpcResponseHeaderProto_RpcStatusProto_FATAL == status);
840
841 if (errClass.empty()) {
842 THROW(HdfsRpcException, "%s: %s",
843 errClass.c_str(), errMessage.c_str());
844 }
845
846 try {
847 THROW(HdfsRpcServerException, "%s: %s", errClass.c_str(),
848 errMessage.c_str());
849 } catch (HdfsRpcServerException & e) {
850 e.setErrClass(errClass);
851 e.setErrMsg(errMessage);
852 rethrow_exception(HandlerRpcResponseException(current_exception()));
853 }
854 }
855 }
856}
857
858}
859}
860