1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "ClientDatanodeProtocol.pb.h"
29#include "Datanode.h"
30#include "Exception.h"
31#include "ExceptionInternal.h"
32#include "RpcHelper.h"
33
34#define DATANODE_VERSION 1
35#define DATANODE_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol"
36#define BLOCK_TOKEN_KIND "HDFS_BLOCK_TOKEN"
37
38using namespace google::protobuf;
39
40namespace Hdfs {
41namespace Internal {
42
43DatanodeImpl::DatanodeImpl(const std::string & host, uint32_t port,
44 const SessionConfig & c, const RpcAuth & a) :
45 auth(a), client(RpcClient::getClient()), conf(c), protocol(
46 DATANODE_VERSION, DATANODE_PROTOCOL, BLOCK_TOKEN_KIND), server(host, port) {
47 server.setTokenService("");
48}
49
50void DatanodeImpl::invoke(const RpcCall & call, bool reuse) {
51 RpcChannel & channel = client.getChannel(auth, protocol, server, conf);
52
53 try {
54 channel.invoke(call);
55 } catch (const HdfsFailoverException & e) {
56 //Datanode do not have HA configuration.
57 channel.close(true);
58 Hdfs::rethrow_if_nested(e);
59 assert(false && "HdfsFailoverException should be always a wrapper of other exception");
60 } catch (...) {
61 channel.close(true);
62 throw;
63 }
64
65 channel.close(!reuse);
66}
67
68int64_t DatanodeImpl::getReplicaVisibleLength(const ExtendedBlock & b) {
69 try {
70 GetReplicaVisibleLengthRequestProto request;
71 GetReplicaVisibleLengthResponseProto response;
72 Build(b, request.mutable_block());
73 invoke(RpcCall(true, "getReplicaVisibleLength", &request, &response), false);
74 return response.length();
75 } catch (const HdfsRpcServerException & e) {
76 UnWrapper<ReplicaNotFoundException, HdfsIOException> unwraper(e);
77 unwraper.unwrap(__FILE__, __LINE__);
78 }
79}
80
81void DatanodeImpl::getBlockLocalPathInfo(const ExtendedBlock & block,
82 const Token & token, BlockLocalPathInfo & info) {
83 try {
84 ExtendedBlock eb;
85 GetBlockLocalPathInfoRequestProto request;
86 GetBlockLocalPathInfoResponseProto response;
87 Build(block, request.mutable_block());
88 Build(token, request.mutable_token());
89 invoke(RpcCall(true, "getBlockLocalPathInfo", &request, &response), true);
90 Convert(eb, response.block());
91 info.setBlock(eb);
92 info.setLocalBlockPath(response.localpath().c_str());
93 info.setLocalMetaPath(response.localmetapath().c_str());
94 } catch (const HdfsRpcServerException & e) {
95 UnWrapper<ReplicaNotFoundException, HdfsIOException> unwraper(e);
96 unwraper.unwrap(__FILE__, __LINE__);
97 }
98}
99
100}
101}
102