1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
29#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
30
31#include "Atomic.h"
32#include "Checksum.h"
33#include "DateTime.h"
34#include "ExceptionInternal.h"
35#include "FileSystem.h"
36#include "Memory.h"
37#include "OutputStreamInter.h"
38#include "PacketPool.h"
39#include "Permission.h"
40#include "Pipeline.h"
41#include "server/LocatedBlock.h"
42#include "SessionConfig.h"
43#include "Thread.h"
44#ifdef MOCK
45#include "PipelineStub.h"
46#endif
47
48namespace Hdfs {
49namespace Internal {
50/**
51 * A output stream used to write data to hdfs.
52 */
53class OutputStreamImpl: public OutputStreamInter {
54public:
55 OutputStreamImpl();
56
57 ~OutputStreamImpl();
58
59 /**
60 * To create or append a file.
61 * @param fs hdfs file system.
62 * @param path the file path.
63 * @param flag creation flag, can be Create, Append or Create|Overwrite.
64 * @param permission create a new file with given permission.
65 * @param createParent if the parent does not exist, create it.
66 * @param replication create a file with given number of replication.
67 * @param blockSize create a file with given block size.
68 */
69 void open(shared_ptr<FileSystemInter> fs, const char * path, int flag,
70 const Permission & permission, bool createParent, int replication,
71 int64_t blockSize);
72
73 /**
74 * To append data to file.
75 * @param buf the data used to append.
76 * @param size the data size.
77 */
78 void append(const char * buf, int64_t size);
79
80 /**
81 * Flush all data in buffer and waiting for ack.
82 * Will block until get all acks.
83 */
84 void flush();
85
86 /**
87 * return the current file length.
88 * @return current file length.
89 */
90 int64_t tell();
91
92 /**
93 * @ref OutputStream::sync
94 */
95 void sync();
96
97 /**
98 * close the stream.
99 */
100 void close();
101
102 /**
103 * Output a readable string of this output stream.
104 */
105 std::string toString();
106
107 /**
108 * Keep the last error of this stream.
109 * @error the error to be kept.
110 */
111 void setError(const exception_ptr & error);
112
113private:
114 void appendChunkToPacket(const char * buf, int size);
115 void appendInternal(const char * buf, int64_t size);
116 void checkStatus();
117 void closePipeline();
118 void completeFile(bool throwError);
119 void computePacketChunkSize();
120 void flushInternal(bool needSync);
121 //void heartBeatSenderRoutine();
122 void initAppend();
123 void openInternal(shared_ptr<FileSystemInter> fs, const char * path, int flag,
124 const Permission & permission, bool createParent, int replication,
125 int64_t blockSize);
126 void reset();
127 void sendPacket(shared_ptr<Packet> packet);
128 void setupPipeline();
129
130private:
131 //atomic<bool> heartBeatStop;
132 bool closed;
133 bool isAppend;
134 bool syncBlock;
135 //condition_variable condHeartBeatSender;
136 exception_ptr lastError;
137 int checksumSize;
138 int chunkSize;
139 int chunksPerPacket;
140 int closeTimeout;
141 int heartBeatInterval;
142 int packetSize;
143 int position; //cursor in buffer
144 int replication;
145 int64_t blockSize; //max size of block
146 int64_t bytesWritten; //the size of bytes has be written into packet (not include the data in chunk buffer).
147 int64_t cursor; //cursor in file.
148 int64_t lastFlushed; //the position last flushed
149 int64_t nextSeqNo;
150 mutex mut;
151 PacketPool packets;
152 shared_ptr<Checksum> checksum;
153 shared_ptr<FileSystemInter> filesystem;
154 shared_ptr<LocatedBlock> lastBlock;
155 shared_ptr<Packet> currentPacket;
156 shared_ptr<Pipeline> pipeline;
157 shared_ptr<SessionConfig> conf;
158 std::string path;
159 std::vector<char> buffer;
160 steady_clock::time_point lastSend;
161 //thread heartBeatSender;
162
163 friend class Pipeline;
164#ifdef MOCK
165private:
166 Hdfs::Mock::PipelineStub * stub;
167#endif
168};
169
170}
171}
172
173#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ */
174