1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_
29#define _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_
30
31#include "FileSystemInter.h"
32#include "Memory.h"
33#include "network/BufferedSocketReader.h"
34#include "network/TcpSocket.h"
35#include "Packet.h"
36#include "PacketPool.h"
37#include "PipelineAck.h"
38#include "server/DatanodeInfo.h"
39#include "server/LocatedBlock.h"
40#include "server/Namenode.h"
41#include "SessionConfig.h"
42#include "Thread.h"
43
44#include <vector>
45#include <deque>
46
47namespace Hdfs {
48namespace Internal {
49
50enum BlockConstructionStage {
51 /**
52 * The enumerates are always listed as regular stage followed by the
53 * recovery stage.
54 * Changing this order will make getRecoveryStage not working.
55 */
56 // pipeline set up for block append
57 PIPELINE_SETUP_APPEND = 0,
58 // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
59 PIPELINE_SETUP_APPEND_RECOVERY = 1,
60 // data streaming
61 DATA_STREAMING = 2,
62 // pipeline setup for failed data streaming recovery
63 PIPELINE_SETUP_STREAMING_RECOVERY = 3,
64 // close the block and pipeline
65 PIPELINE_CLOSE = 4,
66 // Recover a failed PIPELINE_CLOSE
67 PIPELINE_CLOSE_RECOVERY = 5,
68 // pipeline set up for block creation
69 PIPELINE_SETUP_CREATE = 6
70};
71
72static inline const char * StageToString(BlockConstructionStage stage) {
73 switch (stage) {
74 case PIPELINE_SETUP_APPEND:
75 return "PIPELINE_SETUP_APPEND";
76
77 case PIPELINE_SETUP_APPEND_RECOVERY:
78 return "PIPELINE_SETUP_APPEND_RECOVERY";
79
80 case DATA_STREAMING:
81 return "DATA_STREAMING";
82
83 case PIPELINE_SETUP_STREAMING_RECOVERY:
84 return "PIPELINE_SETUP_STREAMING_RECOVERY";
85
86 case PIPELINE_CLOSE:
87 return "PIPELINE_CLOSE";
88
89 case PIPELINE_CLOSE_RECOVERY:
90 return "PIPELINE_CLOSE_RECOVERY";
91
92 case PIPELINE_SETUP_CREATE:
93 return "PIPELINE_SETUP_CREATE";
94
95 default:
96 return "UNKNOWN STAGE";
97 }
98}
99
100class Packet;
101class OutputStreamImpl;
102
103/**
104 * setup, data transfer, close, and failover.
105 */
106class Pipeline {
107public:
108
109 virtual ~Pipeline() {}
110
111 /**
112 * send all data and wait for all ack.
113 */
114 virtual void flush() = 0;
115
116 /**
117 * send LastPacket and close the pipeline.
118 */
119 virtual shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket) = 0;
120
121 /**
122 * send a packet, retry on error until fatal.
123 * @param packet
124 */
125 virtual void send(shared_ptr<Packet> packet) = 0;
126};
127
128class PipelineImpl : public Pipeline {
129public:
130 /**
131 * construct and setup the pipeline for append.
132 */
133 PipelineImpl(bool append, const char * path, const SessionConfig & conf,
134 shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize,
135 int replication, int64_t bytesSent, PacketPool & packetPool,
136 shared_ptr<LocatedBlock> lastBlock);
137
138 /**
139 * send all data and wait for all ack.
140 */
141 void flush();
142
143 /**
144 * send LastPacket and close the pipeline.
145 */
146 shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket);
147
148 /**
149 * send a packet, retry on error until fatal.
150 * @param packet
151 */
152 void send(shared_ptr<Packet> packet);
153
154private:
155 bool addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes);
156 void buildForAppendOrRecovery(bool recovery);
157 void buildForNewBlock();
158 void checkPipelineWithReplicas();
159 void checkResponse(bool wait);
160 void createBlockOutputStream(const Token & token, int64_t gs, bool recovery);
161 void locateNextBlock(const std::vector<DatanodeInfo> & excludedNodes);
162 void processAck(PipelineAck & ack);
163 void processResponse();
164 void resend();
165 void waitForAcks(bool force);
166 void transfer(const ExtendedBlock & blk, const DatanodeInfo & src,
167 const std::vector<DatanodeInfo> & targets,
168 const Token & token);
169 int findNewDatanode(const std::vector<DatanodeInfo> & original);
170
171private:
172 static void checkBadLinkFormat(const std::string & node);
173
174private:
175 BlockConstructionStage stage;
176 bool canAddDatanode;
177 int blockWriteRetry;
178 int checksumType;
179 int chunkSize;
180 int connectTimeout;
181 int errorIndex;
182 int readTimeout;
183 int replication;
184 int writeTimeout;
185 int64_t bytesAcked; //the size of bytes the ack received.
186 int64_t bytesSent; //the size of bytes has sent.
187 PacketPool & packetPool;
188 shared_ptr<BufferedSocketReader> reader;
189 shared_ptr<FileSystemInter> filesystem;
190 shared_ptr<LocatedBlock> lastBlock;
191 shared_ptr<Socket> sock;
192 std::deque<shared_ptr<Packet> > packets;
193 std::string clientName;
194 std::string path;
195 std::vector<DatanodeInfo> nodes;
196 std::vector<std::string> storageIDs;
197
198};
199
200}
201}
202
203#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ */
204