1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "client/Token.h"
29#include "datatransfer.pb.h"
30#include "DataTransferProtocolSender.h"
31#include "Exception.h"
32#include "ExceptionInternal.h"
33#include "hdfs.pb.h"
34#include "Security.pb.h"
35#include "WriteBuffer.h"
36
37using namespace google::protobuf;
38
39namespace Hdfs {
40namespace Internal {
41
42static inline void Send(Socket & sock, DataTransferOp op, Message * msg,
43 int writeTimeout) {
44 WriteBuffer buffer;
45 buffer.writeBigEndian(static_cast<int16_t>(DATA_TRANSFER_VERSION));
46 buffer.write(static_cast<char>(op));
47 int msgSize = msg->ByteSize();
48 buffer.writeVarint32(msgSize);
49 char * b = buffer.alloc(msgSize);
50
51 if (!msg->SerializeToArray(b, msgSize)) {
52 THROW(HdfsIOException,
53 "DataTransferProtocolSender cannot serialize header to send buffer.");
54 }
55
56 sock.writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout);
57}
58
59static inline void BuildBaseHeader(const ExtendedBlock & block,
60 const Token & accessToken, BaseHeaderProto * header) {
61 ExtendedBlockProto * eb = header->mutable_block();
62 TokenProto * token = header->mutable_token();
63 eb->set_blockid(block.getBlockId());
64 eb->set_generationstamp(block.getGenerationStamp());
65 eb->set_numbytes(block.getNumBytes());
66 eb->set_poolid(block.getPoolId());
67 token->set_identifier(accessToken.getIdentifier());
68 token->set_password(accessToken.getPassword());
69 token->set_kind(accessToken.getKind());
70 token->set_service(accessToken.getService());
71}
72
73static inline void BuildClientHeader(const ExtendedBlock & block,
74 const Token & accessToken, const char * clientName,
75 ClientOperationHeaderProto * header) {
76 header->set_clientname(clientName);
77 BuildBaseHeader(block, accessToken, header->mutable_baseheader());
78}
79
80static inline void BuildNodeInfo(const DatanodeInfo & node,
81 DatanodeInfoProto * info) {
82 DatanodeIDProto * id = info->mutable_id();
83 id->set_hostname(node.getHostName());
84 id->set_infoport(node.getInfoPort());
85 id->set_ipaddr(node.getIpAddr());
86 id->set_ipcport(node.getIpcPort());
87 id->set_datanodeuuid(node.getDatanodeId());
88 id->set_xferport(node.getXferPort());
89 info->set_location(node.getLocation());
90}
91
92static inline void BuildNodesInfo(const std::vector<DatanodeInfo> & nodes,
93 RepeatedPtrField<DatanodeInfoProto> * infos) {
94 for (std::size_t i = 0; i < nodes.size(); ++i) {
95 BuildNodeInfo(nodes[i], infos->Add());
96 }
97}
98
99DataTransferProtocolSender::DataTransferProtocolSender(Socket & sock,
100 int writeTimeout, const std::string & datanodeAddr) :
101 sock(sock), writeTimeout(writeTimeout), datanode(datanodeAddr) {
102}
103
104DataTransferProtocolSender::~DataTransferProtocolSender() {
105}
106
107void DataTransferProtocolSender::readBlock(const ExtendedBlock & blk,
108 const Token & blockToken, const char * clientName,
109 int64_t blockOffset, int64_t length) {
110 try {
111 OpReadBlockProto op;
112 op.set_len(length);
113 op.set_offset(blockOffset);
114 BuildClientHeader(blk, blockToken, clientName, op.mutable_header());
115 Send(sock, READ_BLOCK, &op, writeTimeout);
116 } catch (const HdfsCanceled & e) {
117 throw;
118 } catch (const HdfsException & e) {
119 NESTED_THROW(HdfsIOException,
120 "DataTransferProtocolSender cannot send read request to datanode %s.",
121 datanode.c_str());
122 }
123}
124
125void DataTransferProtocolSender::writeBlock(const ExtendedBlock & blk,
126 const Token & blockToken, const char * clientName,
127 const std::vector<DatanodeInfo> & targets, int stage, int pipelineSize,
128 int64_t minBytesRcvd, int64_t maxBytesRcvd,
129 int64_t latestGenerationStamp, int checksumType, int bytesPerChecksum) {
130 try {
131 OpWriteBlockProto op;
132 op.set_latestgenerationstamp(latestGenerationStamp);
133 op.set_minbytesrcvd(minBytesRcvd);
134 op.set_maxbytesrcvd(maxBytesRcvd);
135 op.set_pipelinesize(targets.size());
136 op.set_stage((OpWriteBlockProto_BlockConstructionStage) stage);
137 BuildClientHeader(blk, blockToken, clientName, op.mutable_header());
138 ChecksumProto * ck = op.mutable_requestedchecksum();
139 ck->set_bytesperchecksum(bytesPerChecksum);
140 ck->set_type((ChecksumTypeProto) checksumType);
141 BuildNodesInfo(targets, op.mutable_targets());
142 Send(sock, WRITE_BLOCK, &op, writeTimeout);
143 } catch (const HdfsCanceled & e) {
144 throw;
145 } catch (const HdfsException & e) {
146 NESTED_THROW(HdfsIOException,
147 "DataTransferProtocolSender cannot send write request to datanode %s.",
148 datanode.c_str());
149 }
150}
151
152void DataTransferProtocolSender::transferBlock(const ExtendedBlock & blk,
153 const Token & blockToken, const char * clientName,
154 const std::vector<DatanodeInfo> & targets) {
155 try {
156 OpTransferBlockProto op;
157 BuildClientHeader(blk, blockToken, clientName, op.mutable_header());
158 BuildNodesInfo(targets, op.mutable_targets());
159 Send(sock, TRANSFER_BLOCK, &op, writeTimeout);
160 } catch (const HdfsCanceled & e) {
161 throw;
162 } catch (const HdfsException & e) {
163 NESTED_THROW(HdfsIOException,
164 "DataTransferProtocolSender cannot send transfer request to datanode %s.",
165 datanode.c_str());
166 }
167}
168
169void DataTransferProtocolSender::blockChecksum(const ExtendedBlock & blk,
170 const Token & blockToken) {
171 try {
172 //TODO
173 } catch (const HdfsCanceled & e) {
174 throw;
175 } catch (const HdfsException & e) {
176 NESTED_THROW(HdfsIOException,
177 "DataTransferProtocolSender cannot send checksum request to datanode %s.",
178 datanode.c_str());
179 }
180}
181
182void DataTransferProtocolSender::requestShortCircuitFds(const ExtendedBlock blk,
183 const Token& blockToken,
184 uint32_t maxVersion) {
185 try {
186 OpRequestShortCircuitAccessProto op;
187 BuildBaseHeader(blk, blockToken, op.mutable_header());
188 op.set_maxversion(maxVersion);
189
190 Send(sock, REQUEST_SHORT_CIRCUIT_FDS, &op, writeTimeout);
191 } catch (const HdfsCanceled& e) {
192 throw;
193 } catch (const HdfsException& e) {
194 NESTED_THROW(HdfsIOException,
195 "DataTransferProtocolSender cannot send request "
196 "short-circuit fds request "
197 "to datanode %s.",
198 datanode.c_str());
199 }
200}
201}
202}
203
204