1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_
29#define _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_
30
31#include "client/FileStatus.h"
32#include "client/Permission.h"
33#include "ClientDatanodeProtocol.pb.h"
34#include "ClientNamenodeProtocol.pb.h"
35#include "DatanodeInfo.h"
36#include "Exception.h"
37#include "ExceptionInternal.h"
38#include "ExtendedBlock.h"
39#include "LocatedBlock.h"
40#include "LocatedBlocks.h"
41#include "StackPrinter.h"
42
43#include <algorithm>
44#include <cassert>
45
46using namespace google::protobuf;
47
48namespace Hdfs {
49namespace Internal {
50
51class Nothing {
52};
53
54template < typename T1 = Nothing, typename T2 = Nothing, typename T3 = Nothing,
55 typename T4 = Nothing, typename T5 = Nothing, typename T6 = Nothing,
56 typename T7 = Nothing, typename T8 = Nothing, typename T9 = Nothing,
57 typename T10 = Nothing, typename T11 = Nothing >
58class UnWrapper: public UnWrapper<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Nothing> {
59private:
60 typedef UnWrapper<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Nothing> BaseType;
61
62public:
63 UnWrapper(const HdfsRpcServerException & e) :
64 BaseType(e), e(e) {
65 }
66
67 void ATTRIBUTE_NORETURN ATTRIBUTE_NOINLINE unwrap(const char * file,
68 int line) {
69 if (e.getErrClass() == T1::ReflexName) {
70#ifdef NEED_BOOST
71 boost::throw_exception(T1(e.getErrMsg(), SkipPathPrefix(file), line, PrintStack(1, STACK_DEPTH).c_str()));
72#else
73 throw T1(e.getErrMsg(), SkipPathPrefix(file), line, PrintStack(1, STACK_DEPTH).c_str());
74#endif
75 } else {
76 BaseType::unwrap(file, line);
77 }
78 }
79private:
80 const HdfsRpcServerException & e;
81};
82
83template<>
84class UnWrapper < Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing,
85 Nothing, Nothing, Nothing, Nothing > {
86public:
87 UnWrapper(const HdfsRpcServerException & e) :
88 e(e) {
89 }
90 void ATTRIBUTE_NORETURN ATTRIBUTE_NOINLINE unwrap(const char * file,
91 int line) {
92 THROW(HdfsIOException,
93 "Unexpected exception: when unwrap the rpc remote exception \"%s\", %s in %s: %d",
94 e.getErrClass().c_str(), e.getErrMsg().c_str(), file, line);
95 }
96private:
97 const HdfsRpcServerException & e;
98};
99
100static inline void Convert(ExtendedBlock & eb,
101 const ExtendedBlockProto & proto) {
102 eb.setBlockId(proto.blockid());
103 eb.setGenerationStamp(proto.generationstamp());
104 eb.setNumBytes(proto.numbytes());
105 eb.setPoolId(proto.poolid());
106}
107
108static inline void Convert(Token & token,
109 const TokenProto & proto) {
110 token.setIdentifier(proto.identifier());
111 token.setKind(proto.kind());
112 token.setPassword(proto.password());
113 token.setService(proto.service());
114}
115
116static inline void Convert(DatanodeInfo & node,
117 const DatanodeInfoProto & proto) {
118 const DatanodeIDProto & idProto = proto.id();
119 node.setHostName(idProto.hostname());
120 node.setInfoPort(idProto.infoport());
121 node.setIpAddr(idProto.ipaddr());
122 node.setIpcPort(idProto.ipcport());
123 node.setDatanodeId(idProto.datanodeuuid());
124 node.setXferPort(idProto.xferport());
125 node.setLocation(proto.location());
126}
127
128static inline shared_ptr<LocatedBlock> Convert(const LocatedBlockProto & proto) {
129 Token token;
130 shared_ptr<LocatedBlock> lb(new LocatedBlock);
131 Convert(token, proto.blocktoken());
132 lb->setToken(token);
133 std::vector<DatanodeInfo> & nodes = lb->mutableLocations();
134 nodes.resize(proto.locs_size());
135
136 for (int i = 0 ; i < proto.locs_size(); ++i) {
137 Convert(nodes[i], proto.locs(i));
138 }
139
140 if (proto.storagetypes_size() > 0) {
141 assert(proto.storagetypes_size() == proto.locs_size());
142 std::vector<std::string> & storageIDs = lb->mutableStorageIDs();
143 storageIDs.resize(proto.storagetypes_size());
144
145 for (int i = 0; i < proto.storagetypes_size(); ++i) {
146 storageIDs[i] = proto.storageids(i);
147 }
148 }
149
150 Convert(*lb, proto.b());
151 lb->setOffset(proto.offset());
152 lb->setCorrupt(proto.corrupt());
153 return lb;
154}
155
156static inline void Convert(LocatedBlocks & lbs,
157 const LocatedBlocksProto & proto) {
158 shared_ptr<LocatedBlock> lb;
159 lbs.setFileLength(proto.filelength());
160 lbs.setIsLastBlockComplete(proto.islastblockcomplete());
161 lbs.setUnderConstruction(proto.underconstruction());
162
163 if (proto.has_lastblock()) {
164 lb = Convert(proto.lastblock());
165 lbs.setLastBlock(lb);
166 }
167
168 std::vector<LocatedBlock> & blocks = lbs.getBlocks();
169 blocks.resize(proto.blocks_size());
170
171 for (int i = 0; i < proto.blocks_size(); ++i) {
172 blocks[i] = *Convert(proto.blocks(i));
173 }
174
175 std::sort(blocks.begin(), blocks.end(), std::less<LocatedBlock>());
176}
177
178static inline void Convert(const std::string & src, FileStatus & fs,
179 const HdfsFileStatusProto & proto) {
180 fs.setAccessTime(proto.access_time());
181 fs.setBlocksize(proto.blocksize());
182 fs.setGroup(proto.group().c_str());
183 fs.setLength(proto.length());
184 fs.setModificationTime(proto.modification_time());
185 fs.setOwner(proto.owner().c_str());
186 fs.setPath((src + "/" + proto.path()).c_str());
187 fs.setReplication(proto.block_replication());
188 fs.setSymlink(proto.symlink().c_str());
189 fs.setPermission(Permission(proto.permission().perm()));
190 fs.setIsdir(proto.filetype() == HdfsFileStatusProto::IS_DIR);
191}
192
193static inline void Convert(const std::string & src,
194 std::vector<FileStatus> & dl,
195 const DirectoryListingProto & proto) {
196 RepeatedPtrField<HdfsFileStatusProto> ptrproto = proto.partiallisting();
197
198 for (int i = 0; i < ptrproto.size(); i++) {
199 FileStatus fileStatus;
200 Convert(src, fileStatus, ptrproto.Get(i));
201 dl.push_back(fileStatus);
202 }
203}
204
205static inline Token Convert(const TokenProto & proto) {
206 Token retval;
207 retval.setIdentifier(proto.identifier());
208 retval.setKind(proto.kind());
209 retval.setPassword(proto.password());
210 return retval;
211}
212
213/*static inline void Convert(ContentSummary & contentSummary, const ContentSummaryProto & proto) {
214 contentSummary.setDirectoryCount(proto.directorycount());
215 contentSummary.setFileCount(proto.filecount());
216 contentSummary.setLength(proto.length());
217 contentSummary.setQuota(proto.quota());
218 contentSummary.setSpaceConsumed(proto.spaceconsumed());
219 contentSummary.setSpaceQuota(proto.spacequota());
220}*/
221
222static inline void Build(const Token & token,
223 TokenProto * proto) {
224 proto->set_identifier(token.getIdentifier());
225 proto->set_kind(token.getKind());
226 proto->set_password(token.getPassword());
227 proto->set_service(token.getService());
228}
229
230static inline void Build(const Permission & p, FsPermissionProto * proto) {
231 proto->set_perm(p.toShort());
232}
233
234static inline void Build(const DatanodeInfo & dn, DatanodeIDProto * proto) {
235 proto->set_hostname(dn.getHostName());
236 proto->set_infoport(dn.getInfoPort());
237 proto->set_ipaddr(dn.getIpAddr());
238 proto->set_ipcport(dn.getIpcPort());
239 proto->set_datanodeuuid(dn.getDatanodeId());
240 proto->set_xferport(dn.getXferPort());
241}
242
243static inline void Build(const std::vector<DatanodeInfo> & dns,
244 RepeatedPtrField<DatanodeInfoProto> * proto) {
245 for (size_t i = 0; i < dns.size(); ++i) {
246 DatanodeInfoProto * p = proto->Add();
247 Build(dns[i], p->mutable_id());
248 p->set_location(dns[i].getLocation());
249 }
250}
251
252static inline void Build(const ExtendedBlock & eb, ExtendedBlockProto * proto) {
253 proto->set_blockid(eb.getBlockId());
254 proto->set_generationstamp(eb.getGenerationStamp());
255 proto->set_numbytes(eb.getNumBytes());
256 proto->set_poolid(eb.getPoolId());
257}
258
259static inline void Build(LocatedBlock & b, LocatedBlockProto * proto) {
260 proto->set_corrupt(b.isCorrupt());
261 proto->set_offset(b.getOffset());
262 Build(b, proto->mutable_b());
263 Build(b.getLocations(), proto->mutable_locs());
264}
265
266/*static inline void Build(const std::vector<LocatedBlock> & blocks,
267 RepeatedPtrField<LocatedBlockProto> * proto) {
268 for (size_t i = 0; i < blocks.size(); ++i) {
269 LocatedBlockProto * p = proto->Add();
270 p->set_corrupt(blocks[i].isCorrupt());
271 p->set_offset(blocks[i].getOffset());
272 Build(blocks[i], p->mutable_b());
273 }
274}*/
275
276static inline void Build(const std::vector<std::string> & srcs,
277 RepeatedPtrField<std::string> * proto) {
278 for (size_t i = 0; i < srcs.size(); ++i) {
279 proto->Add()->assign(srcs[i]);
280 }
281}
282
283static inline void Build(const std::vector<DatanodeInfo> & dns,
284 RepeatedPtrField<DatanodeIDProto> * proto) {
285 for (size_t i = 0; i < dns.size(); ++i) {
286 Build(dns[i], proto->Add());
287 }
288}
289
290}
291}
292
293#endif /* _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_ */
294