1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
29#define _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
30
31#include "Atomic.h"
32#include "DateTime.h"
33#include "ExceptionInternal.h"
34#include "IpcConnectionContext.pb.h"
35#include "Memory.h"
36#include "network/BufferedSocketReader.h"
37#include "network/TcpSocket.h"
38#include "RpcCall.h"
39#include "RpcChannelKey.h"
40#include "RpcHeader.pb.h"
41#include "RpcRemoteCall.h"
42#include "SaslClient.h"
43#include "Thread.h"
44#include "Unordered.h"
45
46#include <google/protobuf/message.h>
47
48namespace Hdfs {
49namespace Internal {
50
51class RpcClient;
52
53class RpcChannel {
54public:
55 /**
56 * Destroy a channel
57 */
58 virtual ~RpcChannel() {
59 }
60
61 /**
62 * The caller finished the rpc call,
63 * this channel may be reused later if immediate is false.
64 * @param immediate Do not reuse the channel any more if immediate is true.
65 */
66 virtual void close(bool immediate) = 0;
67
68 /**
69 * Invoke a rpc call.
70 * @param call The call is to be invoked.
71 * @return The remote call object.
72 */
73 virtual void invoke(const RpcCall & call) = 0;
74
75 /**
76 * Close the channel if it idle expired.
77 * @return true if the channel idle expired.
78 */
79 virtual bool checkIdle() = 0;
80
81 /**
82 * Wait for all reference exiting.
83 * The channel cannot be reused any more.
84 * @pre RpcClient is not running.
85 */
86 virtual void waitForExit() = 0;
87
88 /**
89 * Add reference count to this channel.
90 */
91 virtual void addRef() = 0;
92};
93
94/**
95 * RpcChannel represent a rpc connect to the server.
96 */
97class RpcChannelImpl: public RpcChannel {
98public:
99 /**
100 * Construct a RpcChannelImpl instance.
101 * @param k The key of this channel.
102 */
103 RpcChannelImpl(const RpcChannelKey & k, RpcClient & c);
104
105 /**
106 * Destroy a RpcChannelImpl instance.
107 */
108 ~RpcChannelImpl();
109
110 /**
111 * The caller finished the rpc call,
112 * this channel may be reused later if immediate is false.
113 * @param immediate Do not reuse the channel any more if immediate is true.
114 */
115 void close(bool immediate);
116
117 /**
118 * Invoke a rpc call.
119 * @param call The call is to be invoked.
120 * @return The remote call object.
121 */
122 void invoke(const RpcCall & call);
123
124 /**
125 * Close the channel if it idle expired.
126 * @return true if the channel idle expired.
127 */
128 bool checkIdle();
129
130 /**
131 * Wait for all reference exiting.
132 * The channel cannot be reused any more.
133 * @pre RpcClient is not running.
134 */
135 void waitForExit();
136
137 /**
138 * Add reference count to this channel.
139 */
140 void addRef() {
141 ++refs;
142 }
143
144private:
145 /**
146 * Setup the RPC connection.
147 * @pre Already hold write lock.
148 */
149 void connect();
150
151 /**
152 * Cleanup all pending calls.
153 * @param reason The reason to cancel the call.
154 * @pre Already hold write lock.
155 */
156 void cleanupPendingCalls(exception_ptr reason);
157
158 /**
159 * Send rpc connect protocol header.
160 * @throw HdfsNetworkException
161 * @throw HdfsTimeout
162 */
163 void sendConnectionHeader(const RpcAuth& auth);
164
165 /**
166 * Send rpc connection protocol content.
167 */
168 void sendConnectionContent(const RpcAuth & auth);
169
170 /**
171 * Build rpc connect context.
172 */
173 void buildConnectionContext(IpcConnectionContextProto & connectionContext, const RpcAuth & auth);
174
175 /**
176 * Send ping packet to server.
177 * @throw HdfsNetworkException
178 * @throw HdfsTimeout
179 * @pre Caller should hold the write lock.
180 */
181 void sendPing();
182
183 /**
184 * Send the call message to rpc server.
185 * @param remote The remote call.
186 * @pre Already hold write lock.
187 */
188 void sendRequest(RpcRemoteCallPtr remote);
189
190 /**
191 * Issue a rpc call and check response.
192 * Catch all recoverable error in this function
193 *
194 * @param remote The remote call
195 */
196 exception_ptr invokeInternal(RpcRemoteCallPtr remote);
197
198 /**
199 * Check response, block until get one response.
200 * @pre Channel already hold read lock.
201 */
202 void checkOneResponse();
203
204 /**
205 * read and handle one response.
206 * @pre Channel already hold read lock.
207 */
208 void readOneResponse(bool writeLock);
209
210 /**
211 * Get the call object with given id, and then remove it from pending call list.
212 * @param id The id of the call object to be returned.
213 * @return The call object with given id.
214 * @throw HdfsIOException
215 * @pre Channel already locked.
216 */
217 RpcRemoteCallPtr getPendingCall(int32_t id);
218
219 /**
220 * Check if there is data available for reading on socket.
221 * @return true if response is available.
222 */
223 bool getResponse();
224
225 /**
226 * wake up one caller to check response.
227 * @param id The call id which current caller handled.
228 */
229 void wakeupOneCaller(int32_t id);
230
231 /**
232 * shutdown the RPC connection since error.
233 * @param reason The reason to cancel the call
234 * @pre Already hold write lock.
235 */
236 void shutdown(exception_ptr reason);
237
238 const RpcSaslProto_SaslAuth * createSaslClient(
239 const ::google::protobuf::RepeatedPtrField<RpcSaslProto_SaslAuth> * auths);
240
241 void sendSaslMessage(RpcSaslProto * msg, ::google::protobuf::Message * resp);
242
243 std::string saslEvaluateToken(RpcSaslProto & response, bool serverIsDone);
244
245 RpcAuth setupSaslConnection();
246
247private:
248 /**
249 * Construct a RpcChannelImpl instance for test.
250 * @param key The key of this channel.
251 * @param sock The socket instance.
252 * @param in The BufferedSocketReader instance build on sock.
253 * @param client The RpcClient instance.
254 */
255 RpcChannelImpl(const RpcChannelKey & key, Socket * sock,
256 BufferedSocketReader * in, RpcClient & client);
257
258private:
259 atomic<int> refs;
260 bool available;
261 mutex readMut;
262 mutex writeMut;
263 RpcChannelKey key;
264 RpcClient & client;
265 shared_ptr<BufferedSocketReader> in;
266 shared_ptr<SaslClient> saslClient;
267 shared_ptr<Socket> sock;
268 steady_clock::time_point lastActivity; // ping is a kind of activity, lastActivity will be updated after ping
269 steady_clock::time_point lastIdle; // ping cannot change idle state. If there is still pending calls, lastIdle is always "NOW".
270 unordered_map<int32_t, RpcRemoteCallPtr> pendingCalls;
271};
272
273}
274}
275
276#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_ */
277