1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Exception.h"
29#include "ExceptionInternal.h"
30#include "Logger.h"
31#include "Memory.h"
32#include "RpcClient.h"
33#include "Thread.h"
34
35#include <boost/uuid/uuid_generators.hpp>
36
37namespace Hdfs {
38namespace Internal {
39
40once_flag RpcClient::once;
41shared_ptr<RpcClient> RpcClient::client;
42
43void RpcClient::createSinglten() {
44 client = shared_ptr < RpcClient > (new RpcClientImpl());
45}
46
47RpcClient & RpcClient::getClient() {
48 call_once(once, &RpcClientImpl::createSinglten);
49 assert(client);
50 return *client;
51}
52
53RpcClientImpl::RpcClientImpl() :
54 cleaning(false), running(true), count(0) {
55 auto id = boost::uuids::random_generator()();
56 clientId.resize(boost::uuids::uuid::static_size());
57 memcpy(&clientId[0], id.begin(), boost::uuids::uuid::static_size());
58#ifdef MOCK
59 stub = NULL;
60#endif
61}
62
63RpcClientImpl::~RpcClientImpl() {
64 running = false;
65 cond.notify_all();
66
67 if (cleaner.joinable()) {
68 cleaner.join();
69 }
70
71 close();
72}
73
74void RpcClientImpl::clean() {
75 assert(cleaning);
76
77 try {
78 while (running) {
79 try {
80 unique_lock<mutex> lock(mut);
81 cond.wait_for(lock, seconds(1));
82
83 if (!running || allChannels.empty()) {
84 break;
85 }
86
87 unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
88 e = allChannels.end();
89
90 for (s = allChannels.begin(); s != e;) {
91 if (s->second->checkIdle()) {
92 s->second.reset();
93 s = allChannels.erase(s);
94 } else {
95 ++s;
96 }
97 }
98 } catch (const HdfsCanceled & e) {
99 /*
100 * ignore cancel signal here.
101 */
102 }
103 }
104 } catch (const Hdfs::HdfsException & e) {
105 std::string buffer;
106 LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s",
107 GetExceptionDetail(e, buffer));
108 } catch (const std::exception & e) {
109 LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", e.what());
110 }
111
112 cleaning = false;
113}
114
115void RpcClientImpl::close() {
116 lock_guard<mutex> lock(mut);
117 running = false;
118 unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
119 e = allChannels.end();
120
121 for (s = allChannels.begin(); s != e; ++s) {
122 s->second->waitForExit();
123 }
124
125 allChannels.clear();
126}
127
128bool RpcClientImpl::isRunning() {
129 return running;
130}
131
132RpcChannel & RpcClientImpl::getChannel(const RpcAuth & auth,
133 const RpcProtocolInfo & protocol, const RpcServerInfo & server,
134 const RpcConfig & conf) {
135 shared_ptr<RpcChannel> rc;
136 RpcChannelKey key(auth, protocol, server, conf);
137
138 try {
139 lock_guard<mutex> lock(mut);
140
141 if (!running) {
142 THROW(Hdfs::HdfsRpcException,
143 "Cannot Setup RPC channel to \"%s:%s\" since RpcClient is closing",
144 key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
145 }
146
147 unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator it;
148 it = allChannels.find(key);
149
150 if (it != allChannels.end()) {
151 rc = it->second;
152 } else {
153 rc = createChannelInternal(key);
154 allChannels[key] = rc;
155 }
156
157 rc->addRef();
158
159 if (!cleaning) {
160 cleaning = true;
161
162 if (cleaner.joinable()) {
163 cleaner.join();
164 }
165
166 CREATE_THREAD(cleaner, bind(&RpcClientImpl::clean, this));
167 }
168 } catch (const HdfsRpcException & e) {
169 throw;
170 } catch (...) {
171 NESTED_THROW(HdfsRpcException,
172 "RpcClient failed to create a channel to \"%s:%s\"",
173 server.getHost().c_str(), server.getPort().c_str());
174 }
175
176 return *rc;
177}
178
179shared_ptr<RpcChannel> RpcClientImpl::createChannelInternal(
180 const RpcChannelKey & key) {
181 shared_ptr<RpcChannel> channel;
182#ifdef MOCK
183
184 if (stub) {
185 channel = shared_ptr < RpcChannel > (stub->getChannel(key, *this));
186 } else {
187 channel = shared_ptr < RpcChannel > (new RpcChannelImpl(key, *this));
188 }
189
190#else
191 channel = shared_ptr<RpcChannel>(new RpcChannelImpl(key, *this));
192#endif
193 return channel;
194}
195
196}
197}
198