1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_
29#define _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_
30
31#include "DateTime.h"
32#include "ExceptionInternal.h"
33#include "Memory.h"
34#include "RpcCall.h"
35#include "RpcProtocolInfo.h"
36#include "Thread.h"
37#include "WriteBuffer.h"
38
39#define INVALID_RETRY_COUNT -1
40
41namespace Hdfs {
42namespace Internal {
43
44class RpcRemoteCall;
45typedef shared_ptr<RpcRemoteCall> RpcRemoteCallPtr;
46
47class RpcRemoteCall {
48public:
49 RpcRemoteCall(const RpcCall & c, int32_t id, const std::string & clientId) :
50 complete(false), identity(id), call(c), clientId(clientId) {
51 }
52 virtual ~RpcRemoteCall() {
53 }
54
55 virtual void cancel(exception_ptr reason) {
56 unique_lock<mutex> lock(mut);
57 complete = true;
58 error = reason;
59 cond.notify_all();
60 }
61
62 virtual void serialize(const RpcProtocolInfo & protocol,
63 WriteBuffer & buffer);
64
65 const int32_t getIdentity() const {
66 return identity;
67 }
68
69 void wait() {
70 unique_lock<mutex> lock(mut);
71
72 if (!complete) {
73 cond.wait_for(lock, milliseconds(500));
74 }
75 }
76
77 void check() {
78 if (error != exception_ptr()) {
79 rethrow_exception(error);
80 }
81 }
82
83 RpcCall & getCall() {
84 return call;
85 }
86
87 void done() {
88 unique_lock<mutex> lock(mut);
89 complete = true;
90 cond.notify_all();
91 }
92
93 void wakeup() {
94 cond.notify_all();
95 }
96
97 bool finished() {
98 unique_lock<mutex> lock(mut);
99 return complete;
100 }
101
102public:
103 static std::vector<char> GetPingRequest(const std::string & clientid);
104
105private:
106 bool complete;
107 condition_variable cond;
108 const int32_t identity;
109 exception_ptr error;
110 mutex mut;
111 RpcCall call;
112 std::string clientId;
113};
114
115}
116}
117
118#endif /* _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_ */
119