1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "Exception.h" |
29 | #include "ExceptionInternal.h" |
30 | #include "IpcConnectionContext.pb.h" |
31 | #include "Logger.h" |
32 | #include "RpcChannel.h" |
33 | #include "RpcClient.h" |
34 | #include "RpcContentWrapper.h" |
35 | #include "RpcHeader.pb.h" |
36 | #include "RpcHeader.pb.h" |
37 | #include "server/RpcHelper.h" |
38 | #include "Thread.h" |
39 | #include "WriteBuffer.h" |
40 | |
41 | #include <google/protobuf/io/coded_stream.h> |
42 | |
43 | #define "hrpc" |
44 | #define 9 |
45 | #define SERIALIZATION_TYPE_PROTOBUF 0 |
46 | #define CONNECTION_CONTEXT_CALL_ID -3 |
47 | |
48 | using namespace ::google::protobuf; |
49 | using namespace google::protobuf::io; |
50 | |
51 | namespace Hdfs { |
52 | namespace Internal { |
53 | |
54 | RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, RpcClient & c) : |
55 | refs(0), available(false), key(k), client(c) { |
56 | sock = shared_ptr<Socket>(new TcpSocketImpl); |
57 | sock->setLingerTimeout(k.getConf().getLingerTimeout()); |
58 | in = shared_ptr<BufferedSocketReader>( |
59 | new BufferedSocketReaderImpl( |
60 | *static_cast<TcpSocketImpl *>(sock.get()))); |
61 | lastActivity = lastIdle = steady_clock::now(); |
62 | } |
63 | |
64 | RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, Socket * s, |
65 | BufferedSocketReader * in, RpcClient & c) : |
66 | refs(0), available(false), key(k), client(c) { |
67 | sock = shared_ptr<Socket>(s); |
68 | this->in = shared_ptr<BufferedSocketReader>(in); |
69 | lastActivity = lastIdle = steady_clock::now(); |
70 | } |
71 | |
72 | RpcChannelImpl::~RpcChannelImpl() { |
73 | assert(pendingCalls.empty()); |
74 | assert(refs == 0); |
75 | |
76 | if (available) { |
77 | sock->close(); |
78 | } |
79 | } |
80 | |
81 | void RpcChannelImpl::close(bool immediate) { |
82 | lock_guard<mutex> lock(writeMut); |
83 | --refs; |
84 | assert(refs >= 0); |
85 | |
86 | if (immediate && !refs) { |
87 | assert(pendingCalls.empty()); |
88 | available = false; |
89 | sock->close(); |
90 | } |
91 | } |
92 | |
93 | void RpcChannelImpl::sendSaslMessage(RpcSaslProto * msg, Message * resp) { |
94 | int totalLen; |
95 | WriteBuffer buffer; |
96 | RpcRequestHeaderProto ; |
97 | rpcHeader.set_callid(AuthProtocol::SASL); |
98 | rpcHeader.set_clientid(client.getClientId()); |
99 | rpcHeader.set_retrycount(INVALID_RETRY_COUNT); |
100 | rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER); |
101 | rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET); |
102 | RpcContentWrapper wrapper(&rpcHeader, msg); |
103 | totalLen = wrapper.getLength(); |
104 | buffer.writeBigEndian(totalLen); |
105 | wrapper.writeTo(buffer); |
106 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), key.getConf().getWriteTimeout()); |
107 | RpcRemoteCallPtr call( |
108 | new RpcRemoteCall(RpcCall(false, "sasl message" , NULL, resp), |
109 | AuthProtocol::SASL, client.getClientId())); |
110 | pendingCalls[AuthProtocol::SASL] = call; |
111 | } |
112 | |
113 | |
114 | const RpcSaslProto_SaslAuth * RpcChannelImpl::createSaslClient( |
115 | const RepeatedPtrField<RpcSaslProto_SaslAuth> * auths) { |
116 | const RpcSaslProto_SaslAuth * auth = NULL; |
117 | Token token; |
118 | |
119 | for (int i = 0; i < auths->size(); ++i) { |
120 | auth = &auths->Get(i); |
121 | RpcAuth method(RpcAuth::ParseMethod(auth->method())); |
122 | |
123 | if (method.getMethod() == AuthMethod::TOKEN && key.hasToken()) { |
124 | token = key.getToken(); |
125 | break; |
126 | } else if (method.getMethod() == AuthMethod::KERBEROS) { |
127 | break; |
128 | } else if (method.getMethod() == AuthMethod::SIMPLE) { |
129 | return auth; |
130 | } else if (method.getMethod() == AuthMethod::UNKNOWN) { |
131 | return auth; |
132 | } else { |
133 | auth = NULL; |
134 | } |
135 | } |
136 | |
137 | if (!auth) { |
138 | std::stringstream ss; |
139 | ss.imbue(std::locale::classic()); |
140 | ss << "Client cannot authenticate via: " ; |
141 | |
142 | for (int i = 0; i < auths->size(); ++i) { |
143 | auth = &auths->Get(i); |
144 | ss << auth->mechanism() << ", " ; |
145 | } |
146 | |
147 | THROW(AccessControlException, "%s" , ss.str().c_str()); |
148 | } |
149 | |
150 | saslClient = shared_ptr<SaslClient>( |
151 | new SaslClient(*auth, token, key.getAuth().getUser().getPrincipal())); |
152 | return auth; |
153 | } |
154 | |
155 | std::string RpcChannelImpl::saslEvaluateToken(RpcSaslProto & response, bool serverIsDone) { |
156 | std::string token; |
157 | |
158 | if (response.has_token()) { |
159 | token = saslClient->evaluateChallenge(response.token()); |
160 | } else if (!serverIsDone) { |
161 | THROW(AccessControlException, "Server challenge contains no token" ); |
162 | } |
163 | |
164 | if (serverIsDone) { |
165 | if (!saslClient->isComplete()) { |
166 | THROW(AccessControlException, "Client is out of sync with server" ); |
167 | } |
168 | |
169 | if (!token.empty()) { |
170 | THROW(AccessControlException, "Client generated spurious response" ); |
171 | } |
172 | } |
173 | |
174 | return token; |
175 | } |
176 | |
177 | RpcAuth RpcChannelImpl::setupSaslConnection() { |
178 | RpcAuth retval; |
179 | RpcSaslProto negotiateRequest, response, msg; |
180 | negotiateRequest.set_state(RpcSaslProto_SaslState_NEGOTIATE); |
181 | sendSaslMessage(&negotiateRequest, &response); |
182 | bool done = false; |
183 | |
184 | do { |
185 | readOneResponse(false); |
186 | msg.Clear(); |
187 | |
188 | switch (response.state()) { |
189 | case RpcSaslProto_SaslState_NEGOTIATE: { |
190 | const RpcSaslProto_SaslAuth * auth = createSaslClient( |
191 | &response.auths()); |
192 | retval = RpcAuth(RpcAuth::ParseMethod(auth->method())); |
193 | |
194 | if (retval.getMethod() == AuthMethod::SIMPLE) { |
195 | done = true; |
196 | } else if (retval.getMethod() == AuthMethod::UNKNOWN) { |
197 | THROW(AccessControlException, "Unknown auth mechanism" ); |
198 | } else { |
199 | std::string respToken; |
200 | RpcSaslProto_SaslAuth * respAuth = msg.add_auths(); |
201 | respAuth->CopyFrom(*auth); |
202 | std::string chanllege; |
203 | |
204 | if (auth->has_challenge()) { |
205 | chanllege = auth->challenge(); |
206 | respAuth->clear_challenge(); |
207 | } |
208 | |
209 | respToken = saslClient->evaluateChallenge(chanllege); |
210 | |
211 | if (!respToken.empty()) { |
212 | msg.set_token(respToken); |
213 | } |
214 | |
215 | msg.set_state(RpcSaslProto_SaslState_INITIATE); |
216 | } |
217 | |
218 | break; |
219 | } |
220 | |
221 | case RpcSaslProto_SaslState_CHALLENGE: { |
222 | if (!saslClient) { |
223 | THROW(AccessControlException, "Server sent unsolicited challenge" ); |
224 | } |
225 | |
226 | std::string token = saslEvaluateToken(response, false); |
227 | msg.set_token(token); |
228 | msg.set_state(RpcSaslProto_SaslState_RESPONSE); |
229 | break; |
230 | } |
231 | |
232 | case RpcSaslProto_SaslState_SUCCESS: |
233 | if (!saslClient) { |
234 | retval = RpcAuth(AuthMethod::SIMPLE); |
235 | } else { |
236 | saslEvaluateToken(response, true); |
237 | } |
238 | |
239 | done = true; |
240 | break; |
241 | |
242 | default: |
243 | break; |
244 | } |
245 | |
246 | if (!done) { |
247 | response.Clear(); |
248 | sendSaslMessage(&msg, &response); |
249 | } |
250 | } while (!done); |
251 | |
252 | return retval; |
253 | } |
254 | |
255 | void RpcChannelImpl::connect() { |
256 | int sleep = 1; |
257 | exception_ptr lastError; |
258 | const RpcConfig & conf = key.getConf(); |
259 | const RpcServerInfo & server = key.getServer(); |
260 | std::string buffer; |
261 | |
262 | for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) { |
263 | RpcAuth auth = key.getAuth(); |
264 | |
265 | if (key.hasToken()) { |
266 | auth.setMethod(AuthMethod::TOKEN); |
267 | } |
268 | |
269 | try { |
270 | while (true) { |
271 | sock->connect(server.getHost().c_str(), server.getPort().c_str(), |
272 | conf.getConnectTimeout()); |
273 | sock->setNoDelay(conf.isTcpNoDelay()); |
274 | sendConnectionHeader(auth); |
275 | |
276 | if (auth.getProtocol() == AuthProtocol::SASL) { |
277 | auth = setupSaslConnection(); |
278 | |
279 | if (auth.getProtocol() == AuthProtocol::SASL) { |
280 | //success |
281 | break; |
282 | } |
283 | |
284 | /* |
285 | * switch to other auth protocol |
286 | */ |
287 | sock->close(); |
288 | CheckOperationCanceled(); |
289 | } else { |
290 | break; |
291 | } |
292 | } |
293 | |
294 | auth.setUser(key.getAuth().getUser()); |
295 | sendConnectionContent(auth); |
296 | available = true; |
297 | lastActivity = lastIdle = steady_clock::now(); |
298 | return; |
299 | } catch (const SaslException & e) { |
300 | /* |
301 | * Namenode may treat this connect as replay, retry later |
302 | */ |
303 | sleep = (rand() % 5) + 1; |
304 | lastError = current_exception(); |
305 | LOG(LOG_ERROR, |
306 | "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s" , |
307 | server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); |
308 | } catch (const HdfsNetworkException & e) { |
309 | sleep = 1; |
310 | lastError = current_exception(); |
311 | LOG(LOG_ERROR, |
312 | "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s" , |
313 | server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); |
314 | } catch (const HdfsTimeoutException & e) { |
315 | sleep = 1; |
316 | lastError = current_exception(); |
317 | LOG(LOG_ERROR, |
318 | "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s" , |
319 | server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); |
320 | } |
321 | |
322 | if (i + 1 < conf.getMaxRetryOnConnect()) { |
323 | LOG(INFO, |
324 | "Retrying connect to server: \"%s:%s\". Already tried %d time(s)" , |
325 | server.getHost().c_str(), server.getPort().c_str(), i + 1); |
326 | } |
327 | |
328 | sock->close(); |
329 | CheckOperationCanceled(); |
330 | sleep_for(seconds(sleep)); |
331 | } |
332 | |
333 | rethrow_exception(lastError); |
334 | } |
335 | |
336 | exception_ptr RpcChannelImpl::invokeInternal(RpcRemoteCallPtr remote) { |
337 | const RpcCall & call = remote->getCall(); |
338 | exception_ptr lastError; |
339 | |
340 | try { |
341 | if (client.isRunning()) { |
342 | lock_guard<mutex> lock(writeMut); |
343 | |
344 | if (!available) { |
345 | connect(); |
346 | } |
347 | |
348 | sendRequest(remote); |
349 | } |
350 | |
351 | /* |
352 | * We use one call thread to check response, |
353 | * other thread will wait on RPC call complete. |
354 | */ |
355 | while (client.isRunning()) { |
356 | if (remote->finished()) { |
357 | /* |
358 | * Current RPC call has finished. |
359 | * Wake up another thread to check response. |
360 | */ |
361 | wakeupOneCaller(remote->getIdentity()); |
362 | break; |
363 | } |
364 | |
365 | unique_lock<mutex> lock(readMut, defer_lock_t()); |
366 | |
367 | if (lock.try_lock()) { |
368 | /* |
369 | * Current thread will check response. |
370 | */ |
371 | checkOneResponse(); |
372 | } else { |
373 | /* |
374 | * Another thread checks response, just wait. |
375 | */ |
376 | remote->wait(); |
377 | } |
378 | } |
379 | } catch (const HdfsNetworkConnectException & e) { |
380 | try { |
381 | NESTED_THROW(HdfsFailoverException, |
382 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
383 | call.getName(), key.getServer().getHost().c_str(), |
384 | key.getServer().getPort().c_str()); |
385 | } catch (const HdfsFailoverException & e) { |
386 | lastError = current_exception(); |
387 | } |
388 | } catch (const HdfsNetworkException & e) { |
389 | try { |
390 | NESTED_THROW(HdfsRpcException, |
391 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
392 | call.getName(), key.getServer().getHost().c_str(), |
393 | key.getServer().getPort().c_str()); |
394 | } catch (const HdfsRpcException & e) { |
395 | lastError = current_exception(); |
396 | } |
397 | } catch (const HdfsTimeoutException & e) { |
398 | try { |
399 | NESTED_THROW(HdfsFailoverException, |
400 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
401 | call.getName(), key.getServer().getHost().c_str(), |
402 | key.getServer().getPort().c_str()); |
403 | } catch (const HdfsFailoverException & e) { |
404 | lastError = current_exception(); |
405 | } |
406 | } catch (const HdfsRpcException & e) { |
407 | lastError = current_exception(); |
408 | } catch (const HdfsIOException & e) { |
409 | try { |
410 | NESTED_THROW(HdfsRpcException, |
411 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\"" , |
412 | call.getName(), key.getServer().getHost().c_str(), |
413 | key.getServer().getPort().c_str()); |
414 | } catch (const HdfsRpcException & e) { |
415 | lastError = current_exception(); |
416 | } |
417 | } |
418 | |
419 | return lastError; |
420 | } |
421 | |
422 | void RpcChannelImpl::invoke(const RpcCall & call) { |
423 | assert(refs > 0); |
424 | RpcRemoteCallPtr remote; |
425 | exception_ptr lastError; |
426 | |
427 | try { |
428 | bool retry = false; |
429 | |
430 | do { |
431 | int32_t id = client.getCallId(); |
432 | remote = RpcRemoteCallPtr(new RpcRemoteCall(call, id, client.getClientId())); |
433 | lastError = exception_ptr(); |
434 | lastError = invokeInternal(remote); |
435 | |
436 | if (lastError) { |
437 | lock_guard<mutex> lock(writeMut); |
438 | shutdown(lastError); |
439 | |
440 | if (!retry && call.isIdempotent()) { |
441 | retry = true; |
442 | std::string buffer; |
443 | LOG(LOG_ERROR, |
444 | "Failed to invoke RPC call \"%s\" on server \"%s:%s\": \n%s" , |
445 | call.getName(), key.getServer().getHost().c_str(), |
446 | key.getServer().getPort().c_str(), |
447 | GetExceptionDetail(lastError, buffer)); |
448 | LOG(INFO, |
449 | "Retry idempotent RPC call \"%s\" on server \"%s:%s\"" , |
450 | call.getName(), key.getServer().getHost().c_str(), |
451 | key.getServer().getPort().c_str()); |
452 | } else { |
453 | rethrow_exception(lastError); |
454 | } |
455 | } else { |
456 | break; |
457 | } |
458 | } while (retry); |
459 | } catch (const HdfsRpcServerException & e) { |
460 | if (!remote->finished()) { |
461 | /* |
462 | * a fatal error happened, the caller will unwrap it. |
463 | */ |
464 | lock_guard<mutex> lock(writeMut); |
465 | lastError = current_exception(); |
466 | shutdown(lastError); |
467 | } |
468 | |
469 | /* |
470 | * else not a fatal error, check again at the end of this function. |
471 | */ |
472 | } catch (const HdfsException & e) { |
473 | lock_guard<mutex> lock(writeMut); |
474 | lastError = current_exception(); |
475 | shutdown(lastError); |
476 | } |
477 | |
478 | /* |
479 | * if the call is not finished, either failed to setup connection, |
480 | * or client is closing. |
481 | */ |
482 | if (!remote->finished() || !client.isRunning()) { |
483 | lock_guard<mutex> lock(writeMut); |
484 | |
485 | if (lastError == exception_ptr()) { |
486 | try { |
487 | THROW(Hdfs::HdfsRpcException, |
488 | "Failed to invoke RPC call \"%s\", RPC channel to \"%s:%s\" is to be closed since RpcClient is closing" , |
489 | call.getName(), key.getServer().getHost().c_str(), key.getServer().getPort().c_str()); |
490 | } catch (...) { |
491 | lastError = current_exception(); |
492 | } |
493 | } |
494 | |
495 | /* |
496 | * wake up all. |
497 | */ |
498 | shutdown(lastError); |
499 | rethrow_exception(lastError); |
500 | } |
501 | |
502 | remote->check(); |
503 | } |
504 | |
505 | void RpcChannelImpl::shutdown(exception_ptr reason) { |
506 | assert(reason != exception_ptr()); |
507 | available = false; |
508 | cleanupPendingCalls(reason); |
509 | sock->close(); |
510 | } |
511 | |
512 | void RpcChannelImpl::wakeupOneCaller(int32_t id) { |
513 | lock_guard<mutex> lock(writeMut); |
514 | unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e; |
515 | e = pendingCalls.end(); |
516 | |
517 | for (s = pendingCalls.begin(); s != e; ++s) { |
518 | if (s->first != id) { |
519 | s->second->wakeup(); |
520 | return; |
521 | } |
522 | } |
523 | } |
524 | |
525 | void RpcChannelImpl::sendRequest(RpcRemoteCallPtr remote) { |
526 | WriteBuffer buffer; |
527 | assert(true == available); |
528 | remote->serialize(key.getProtocol(), buffer); |
529 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), |
530 | key.getConf().getWriteTimeout()); |
531 | uint32_t id = remote->getIdentity(); |
532 | pendingCalls[id] = remote; |
533 | lastActivity = lastIdle = steady_clock::now(); |
534 | } |
535 | |
536 | void RpcChannelImpl::cleanupPendingCalls(exception_ptr reason) { |
537 | assert(!writeMut.try_lock()); |
538 | unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e; |
539 | e = pendingCalls.end(); |
540 | |
541 | for (s = pendingCalls.begin(); s != e; ++s) { |
542 | s->second->cancel(reason); |
543 | } |
544 | |
545 | pendingCalls.clear(); |
546 | } |
547 | |
548 | void RpcChannelImpl::checkOneResponse() { |
549 | int ping = key.getConf().getPingTimeout(); |
550 | int timeout = key.getConf().getRpcTimeout(); |
551 | steady_clock::time_point start = steady_clock::now(); |
552 | |
553 | while (client.isRunning()) { |
554 | if (getResponse()) { |
555 | readOneResponse(true); |
556 | return; |
557 | } else { |
558 | if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) { |
559 | lock_guard<mutex> lock(writeMut); |
560 | sendPing(); |
561 | } |
562 | } |
563 | |
564 | if (timeout > 0 && ToMilliSeconds(start, steady_clock::now()) >= timeout) { |
565 | try { |
566 | THROW(Hdfs::HdfsTimeoutException, "Timeout when wait for response from RPC channel \"%s:%s\"" , |
567 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()); |
568 | } catch (...) { |
569 | NESTED_THROW(Hdfs::HdfsRpcException, "Timeout when wait for response from RPC channel \"%s:%s\"" , |
570 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()); |
571 | } |
572 | } |
573 | } |
574 | } |
575 | |
576 | void RpcChannelImpl::sendPing() { |
577 | static const std::vector<char> pingRequest = RpcRemoteCall::GetPingRequest(client.getClientId()); |
578 | |
579 | if (available) { |
580 | LOG(INFO, |
581 | "RPC channel to \"%s:%s\" got no response or idle for %d milliseconds, sending ping." , |
582 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), key.getConf().getPingTimeout()); |
583 | sock->writeFully(&pingRequest[0], pingRequest.size(), key.getConf().getWriteTimeout()); |
584 | lastActivity = steady_clock::now(); |
585 | } |
586 | } |
587 | |
588 | bool RpcChannelImpl::checkIdle() { |
589 | unique_lock<mutex> lock(writeMut, defer_lock_t()); |
590 | |
591 | if (lock.try_lock()) { |
592 | if (!pendingCalls.empty() || refs > 0) { |
593 | lastIdle = steady_clock::now(); |
594 | return false; |
595 | } |
596 | |
597 | int idle = key.getConf().getMaxIdleTime(); |
598 | int ping = key.getConf().getPingTimeout(); |
599 | |
600 | try { |
601 | //close the connection if idle timeout |
602 | if (ToMilliSeconds(lastIdle, steady_clock::now()) >= idle) { |
603 | sock->close(); |
604 | return true; |
605 | } |
606 | |
607 | //send ping |
608 | if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) { |
609 | sendPing(); |
610 | } |
611 | } catch (...) { |
612 | std::string buffer; |
613 | LOG(LOG_ERROR, |
614 | "Failed to send ping via idle RPC channel to server \"%s:%s\": " |
615 | "\n%s" , |
616 | key.getServer().getHost().c_str(), |
617 | key.getServer().getPort().c_str(), |
618 | GetExceptionDetail(current_exception(), buffer)); |
619 | sock->close(); |
620 | return true; |
621 | } |
622 | } |
623 | |
624 | return false; |
625 | } |
626 | |
627 | void RpcChannelImpl::waitForExit() { |
628 | assert(!client.isRunning()); |
629 | |
630 | while (refs != 0) { |
631 | sleep_for(milliseconds(100)); |
632 | } |
633 | |
634 | assert(pendingCalls.empty()); |
635 | } |
636 | |
637 | /** |
638 | * Write the connection header - this is sent when connection is established |
639 | * +----------------------------------+ |
640 | * | "hrpc" 4 bytes | |
641 | * +----------------------------------+ |
642 | * | Version (1 byte) | |
643 | * +----------------------------------+ |
644 | * | Service Class (1 byte) | |
645 | * +----------------------------------+ |
646 | * | AuthProtocol (1 byte) | |
647 | * +----------------------------------+ |
648 | */ |
649 | void RpcChannelImpl::(const RpcAuth &auth) { |
650 | WriteBuffer buffer; |
651 | buffer.write(RPC_HEADER_MAGIC, strlen(RPC_HEADER_MAGIC)); |
652 | buffer.write(static_cast<char>(RPC_HEADER_VERSION)); |
653 | buffer.write(static_cast<char>(0)); //for future feature |
654 | buffer.write(static_cast<char>(auth.getProtocol())); |
655 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), |
656 | key.getConf().getWriteTimeout()); |
657 | } |
658 | |
659 | void RpcChannelImpl::buildConnectionContext( |
660 | IpcConnectionContextProto & connectionContext, const RpcAuth & auth) { |
661 | connectionContext.set_protocol(key.getProtocol().getProtocol()); |
662 | std::string euser = key.getAuth().getUser().getPrincipal(); |
663 | std::string ruser = key.getAuth().getUser().getRealUser(); |
664 | |
665 | if (auth.getMethod() != AuthMethod::TOKEN) { |
666 | UserInformationProto * user = connectionContext.mutable_userinfo(); |
667 | user->set_effectiveuser(euser); |
668 | |
669 | if (auth.getMethod() != AuthMethod::SIMPLE) { |
670 | if (!ruser.empty() && ruser != euser) { |
671 | user->set_realuser(ruser); |
672 | } |
673 | } |
674 | } |
675 | } |
676 | |
677 | void RpcChannelImpl::sendConnectionContent(const RpcAuth & auth) { |
678 | WriteBuffer buffer; |
679 | IpcConnectionContextProto connectionContext; |
680 | RpcRequestHeaderProto ; |
681 | buildConnectionContext(connectionContext, auth); |
682 | rpcHeader.set_callid(CONNECTION_CONTEXT_CALL_ID); |
683 | rpcHeader.set_clientid(client.getClientId()); |
684 | rpcHeader.set_retrycount(INVALID_RETRY_COUNT); |
685 | rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER); |
686 | rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET); |
687 | RpcContentWrapper wrapper(&rpcHeader, &connectionContext); |
688 | int size = wrapper.getLength(); |
689 | buffer.writeBigEndian(size); |
690 | wrapper.writeTo(buffer); |
691 | sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), |
692 | key.getConf().getWriteTimeout()); |
693 | lastActivity = lastIdle = steady_clock::now(); |
694 | } |
695 | |
696 | RpcRemoteCallPtr RpcChannelImpl::getPendingCall(int32_t id) { |
697 | unordered_map<int32_t, RpcRemoteCallPtr>::iterator it; |
698 | it = pendingCalls.find(id); |
699 | |
700 | if (it == pendingCalls.end()) { |
701 | THROW(HdfsRpcException, |
702 | "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot find pending call: id = %d." , |
703 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), static_cast<int>(id)); |
704 | } |
705 | |
706 | RpcRemoteCallPtr rc = it->second; |
707 | pendingCalls.erase(it); |
708 | return rc; |
709 | } |
710 | |
711 | bool RpcChannelImpl::getResponse() { |
712 | int idleTimeout = key.getConf().getMaxIdleTime(); |
713 | int pingTimeout = key.getConf().getPingTimeout(); |
714 | int timeout = key.getConf().getRpcTimeout(); |
715 | int interval = pingTimeout < idleTimeout ? pingTimeout : idleTimeout; |
716 | interval /= 2; |
717 | interval = interval < timeout ? interval : timeout; |
718 | steady_clock::time_point s = steady_clock::now(); |
719 | |
720 | while (client.isRunning()) { |
721 | if (in->poll(500)) { |
722 | return true; |
723 | } |
724 | |
725 | if (ToMilliSeconds(s, steady_clock::now()) >= interval) { |
726 | return false; |
727 | } |
728 | } |
729 | |
730 | return false; |
731 | } |
732 | |
733 | static exception_ptr HandlerRpcResponseException(exception_ptr e) { |
734 | exception_ptr retval = e; |
735 | |
736 | try { |
737 | rethrow_exception(e); |
738 | } catch (const HdfsRpcServerException & e) { |
739 | UnWrapper < NameNodeStandbyException, RpcNoSuchMethodException, UnsupportedOperationException, |
740 | AccessControlException, SafeModeException, SaslException > unwrapper(e); |
741 | |
742 | try { |
743 | unwrapper.unwrap(__FILE__, __LINE__); |
744 | } catch (const NameNodeStandbyException & e) { |
745 | retval = current_exception(); |
746 | } catch (const UnsupportedOperationException & e) { |
747 | retval = current_exception(); |
748 | } catch (const AccessControlException & e) { |
749 | retval = current_exception(); |
750 | } catch (const SafeModeException & e) { |
751 | retval = current_exception(); |
752 | } catch (const SaslException & e) { |
753 | retval = current_exception(); |
754 | } catch (const RpcNoSuchMethodException & e) { |
755 | retval = current_exception(); |
756 | } catch (const HdfsIOException & e) { |
757 | } |
758 | } |
759 | |
760 | return retval; |
761 | } |
762 | |
763 | void RpcChannelImpl::readOneResponse(bool writeLock) { |
764 | int readTimeout = key.getConf().getReadTimeout(); |
765 | std::vector<char> buffer(128); |
766 | RpcResponseHeaderProto ; |
767 | RpcResponseHeaderProto::RpcStatusProto status; |
768 | uint32_t = 0, bodySize = 0; |
769 | in->readBigEndianInt32(readTimeout); |
770 | /* |
771 | * read response header |
772 | */ |
773 | headerSize = in->readVarint32(readTimeout); |
774 | buffer.resize(headerSize); |
775 | in->readFully(&buffer[0], headerSize, readTimeout); |
776 | |
777 | if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) { |
778 | THROW(HdfsRpcException, |
779 | "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header." , |
780 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()) |
781 | } |
782 | |
783 | lastActivity = steady_clock::now(); |
784 | status = curRespHeader.status(); |
785 | |
786 | if (RpcResponseHeaderProto_RpcStatusProto_SUCCESS == status) { |
787 | /* |
788 | * on success, read response body |
789 | */ |
790 | RpcRemoteCallPtr rc; |
791 | |
792 | if (writeLock) { |
793 | lock_guard<mutex> lock(writeMut); |
794 | rc = getPendingCall(curRespHeader.callid()); |
795 | } else { |
796 | rc = getPendingCall(curRespHeader.callid()); |
797 | } |
798 | |
799 | bodySize = in->readVarint32(readTimeout); |
800 | buffer.resize(bodySize); |
801 | |
802 | if (bodySize > 0) { |
803 | in->readFully(&buffer[0], bodySize, readTimeout); |
804 | } |
805 | |
806 | Message * response = rc->getCall().getResponse(); |
807 | |
808 | if (!response->ParseFromArray(&buffer[0], bodySize)) { |
809 | THROW(HdfsRpcException, |
810 | "RPC channel to \"%s:%s\" got protocol mismatch: rpc channel cannot parse response." , |
811 | key.getServer().getHost().c_str(), key.getServer().getPort().c_str()) |
812 | } |
813 | |
814 | rc->done(); |
815 | } else { |
816 | /* |
817 | * on error, read error class and message |
818 | */ |
819 | std::string errClass, errMessage; |
820 | errClass = curRespHeader.exceptionclassname(); |
821 | errMessage = curRespHeader.errormsg(); |
822 | |
823 | if (RpcResponseHeaderProto_RpcStatusProto_ERROR == status) { |
824 | RpcRemoteCallPtr rc; |
825 | { |
826 | lock_guard<mutex> lock(writeMut); |
827 | rc = getPendingCall(curRespHeader.callid()); |
828 | } |
829 | |
830 | try { |
831 | THROW(HdfsRpcServerException, "%s: %s" , |
832 | errClass.c_str(), errMessage.c_str()); |
833 | } catch (HdfsRpcServerException & e) { |
834 | e.setErrClass(errClass); |
835 | e.setErrMsg(errMessage); |
836 | rc->cancel(HandlerRpcResponseException(current_exception())); |
837 | } |
838 | } else { /*fatal*/ |
839 | assert(RpcResponseHeaderProto_RpcStatusProto_FATAL == status); |
840 | |
841 | if (errClass.empty()) { |
842 | THROW(HdfsRpcException, "%s: %s" , |
843 | errClass.c_str(), errMessage.c_str()); |
844 | } |
845 | |
846 | try { |
847 | THROW(HdfsRpcServerException, "%s: %s" , errClass.c_str(), |
848 | errMessage.c_str()); |
849 | } catch (HdfsRpcServerException & e) { |
850 | e.setErrClass(errClass); |
851 | e.setErrMsg(errMessage); |
852 | rethrow_exception(HandlerRpcResponseException(current_exception())); |
853 | } |
854 | } |
855 | } |
856 | } |
857 | |
858 | } |
859 | } |
860 | |