1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ |
29 | #define _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ |
30 | |
31 | #include "FileSystemInter.h" |
32 | #include "Memory.h" |
33 | #include "network/BufferedSocketReader.h" |
34 | #include "network/TcpSocket.h" |
35 | #include "Packet.h" |
36 | #include "PacketPool.h" |
37 | #include "PipelineAck.h" |
38 | #include "server/DatanodeInfo.h" |
39 | #include "server/LocatedBlock.h" |
40 | #include "server/Namenode.h" |
41 | #include "SessionConfig.h" |
42 | #include "Thread.h" |
43 | |
44 | #include <vector> |
45 | #include <deque> |
46 | |
47 | namespace Hdfs { |
48 | namespace Internal { |
49 | |
50 | enum BlockConstructionStage { |
51 | /** |
52 | * The enumerates are always listed as regular stage followed by the |
53 | * recovery stage. |
54 | * Changing this order will make getRecoveryStage not working. |
55 | */ |
56 | // pipeline set up for block append |
57 | PIPELINE_SETUP_APPEND = 0, |
58 | // pipeline set up for failed PIPELINE_SETUP_APPEND recovery |
59 | PIPELINE_SETUP_APPEND_RECOVERY = 1, |
60 | // data streaming |
61 | DATA_STREAMING = 2, |
62 | // pipeline setup for failed data streaming recovery |
63 | PIPELINE_SETUP_STREAMING_RECOVERY = 3, |
64 | // close the block and pipeline |
65 | PIPELINE_CLOSE = 4, |
66 | // Recover a failed PIPELINE_CLOSE |
67 | PIPELINE_CLOSE_RECOVERY = 5, |
68 | // pipeline set up for block creation |
69 | PIPELINE_SETUP_CREATE = 6 |
70 | }; |
71 | |
72 | static inline const char * StageToString(BlockConstructionStage stage) { |
73 | switch (stage) { |
74 | case PIPELINE_SETUP_APPEND: |
75 | return "PIPELINE_SETUP_APPEND" ; |
76 | |
77 | case PIPELINE_SETUP_APPEND_RECOVERY: |
78 | return "PIPELINE_SETUP_APPEND_RECOVERY" ; |
79 | |
80 | case DATA_STREAMING: |
81 | return "DATA_STREAMING" ; |
82 | |
83 | case PIPELINE_SETUP_STREAMING_RECOVERY: |
84 | return "PIPELINE_SETUP_STREAMING_RECOVERY" ; |
85 | |
86 | case PIPELINE_CLOSE: |
87 | return "PIPELINE_CLOSE" ; |
88 | |
89 | case PIPELINE_CLOSE_RECOVERY: |
90 | return "PIPELINE_CLOSE_RECOVERY" ; |
91 | |
92 | case PIPELINE_SETUP_CREATE: |
93 | return "PIPELINE_SETUP_CREATE" ; |
94 | |
95 | default: |
96 | return "UNKNOWN STAGE" ; |
97 | } |
98 | } |
99 | |
100 | class Packet; |
101 | class OutputStreamImpl; |
102 | |
103 | /** |
104 | * setup, data transfer, close, and failover. |
105 | */ |
106 | class Pipeline { |
107 | public: |
108 | |
109 | virtual ~Pipeline() {} |
110 | |
111 | /** |
112 | * send all data and wait for all ack. |
113 | */ |
114 | virtual void flush() = 0; |
115 | |
116 | /** |
117 | * send LastPacket and close the pipeline. |
118 | */ |
119 | virtual shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket) = 0; |
120 | |
121 | /** |
122 | * send a packet, retry on error until fatal. |
123 | * @param packet |
124 | */ |
125 | virtual void send(shared_ptr<Packet> packet) = 0; |
126 | }; |
127 | |
128 | class PipelineImpl : public Pipeline { |
129 | public: |
130 | /** |
131 | * construct and setup the pipeline for append. |
132 | */ |
133 | PipelineImpl(bool append, const char * path, const SessionConfig & conf, |
134 | shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize, |
135 | int replication, int64_t bytesSent, PacketPool & packetPool, |
136 | shared_ptr<LocatedBlock> lastBlock); |
137 | |
138 | /** |
139 | * send all data and wait for all ack. |
140 | */ |
141 | void flush(); |
142 | |
143 | /** |
144 | * send LastPacket and close the pipeline. |
145 | */ |
146 | shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket); |
147 | |
148 | /** |
149 | * send a packet, retry on error until fatal. |
150 | * @param packet |
151 | */ |
152 | void send(shared_ptr<Packet> packet); |
153 | |
154 | private: |
155 | bool addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes); |
156 | void buildForAppendOrRecovery(bool recovery); |
157 | void buildForNewBlock(); |
158 | void checkPipelineWithReplicas(); |
159 | void checkResponse(bool wait); |
160 | void createBlockOutputStream(const Token & token, int64_t gs, bool recovery); |
161 | void locateNextBlock(const std::vector<DatanodeInfo> & excludedNodes); |
162 | void processAck(PipelineAck & ack); |
163 | void processResponse(); |
164 | void resend(); |
165 | void waitForAcks(bool force); |
166 | void transfer(const ExtendedBlock & blk, const DatanodeInfo & src, |
167 | const std::vector<DatanodeInfo> & targets, |
168 | const Token & token); |
169 | int findNewDatanode(const std::vector<DatanodeInfo> & original); |
170 | |
171 | private: |
172 | static void checkBadLinkFormat(const std::string & node); |
173 | |
174 | private: |
175 | BlockConstructionStage stage; |
176 | bool canAddDatanode; |
177 | int blockWriteRetry; |
178 | int checksumType; |
179 | int chunkSize; |
180 | int connectTimeout; |
181 | int errorIndex; |
182 | int readTimeout; |
183 | int replication; |
184 | int writeTimeout; |
185 | int64_t bytesAcked; //the size of bytes the ack received. |
186 | int64_t bytesSent; //the size of bytes has sent. |
187 | PacketPool & packetPool; |
188 | shared_ptr<BufferedSocketReader> reader; |
189 | shared_ptr<FileSystemInter> filesystem; |
190 | shared_ptr<LocatedBlock> lastBlock; |
191 | shared_ptr<Socket> sock; |
192 | std::deque<shared_ptr<Packet> > packets; |
193 | std::string clientName; |
194 | std::string path; |
195 | std::vector<DatanodeInfo> nodes; |
196 | std::vector<std::string> storageIDs; |
197 | |
198 | }; |
199 | |
200 | } |
201 | } |
202 | |
203 | #endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ */ |
204 | |