1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#ifndef ORC_COLUMN_WRITER_HH
20#define ORC_COLUMN_WRITER_HH
21
22#include "orc/Vector.hh"
23
24#include "ByteRLE.hh"
25#include "Compression.hh"
26#include "orc/Exceptions.hh"
27#include "Statistics.hh"
28
29#include "wrap/orc-proto-wrapper.hh"
30
31namespace orc {
32
33 class StreamsFactory {
34 public:
35 virtual ~StreamsFactory();
36
37 /**
38 * Get the stream for the given column/kind in this stripe.
39 * @param kind the kind of the stream
40 * @return the buffer output stream
41 */
42 virtual std::unique_ptr<BufferedOutputStream>
43 createStream(proto::Stream_Kind kind) const = 0;
44 };
45
46 std::unique_ptr<StreamsFactory> createStreamsFactory(
47 const WriterOptions& options,
48 OutputStream * outStream);
49
50 /**
51 * record stream positions for row index
52 */
53 class RowIndexPositionRecorder : public PositionRecorder {
54 public:
55 virtual ~RowIndexPositionRecorder() override;
56
57 RowIndexPositionRecorder(proto::RowIndexEntry& entry):
58 rowIndexEntry(entry) {}
59
60 virtual void add(uint64_t pos) override {
61 rowIndexEntry.add_positions(pos);
62 }
63
64 private:
65 proto::RowIndexEntry& rowIndexEntry;
66 };
67
68 /**
69 * The interface for writing ORC data types.
70 */
71 class ColumnWriter {
72 protected:
73 std::unique_ptr<ByteRleEncoder> notNullEncoder;
74 uint64_t columnId;
75 std::unique_ptr<MutableColumnStatistics> colIndexStatistics;
76 std::unique_ptr<MutableColumnStatistics> colStripeStatistics;
77 std::unique_ptr<MutableColumnStatistics> colFileStatistics;
78
79 bool enableIndex;
80 // row index for this column, contains all RowIndexEntries in 1 stripe
81 std::unique_ptr<proto::RowIndex> rowIndex;
82 std::unique_ptr<proto::RowIndexEntry> rowIndexEntry;
83 std::unique_ptr<RowIndexPositionRecorder> rowIndexPosition;
84
85 public:
86 ColumnWriter(const Type& type, const StreamsFactory& factory,
87 const WriterOptions& options);
88
89 virtual ~ColumnWriter();
90
91 /**
92 * Write the next group of values from this rowBatch.
93 * @param rowBatch the row batch data to write
94 * @param offset the starting point of row batch to write
95 * @param numValues the number of values to write
96 */
97 virtual void add(ColumnVectorBatch& rowBatch,
98 uint64_t offset,
99 uint64_t numValues);
100 /**
101 * Flush column writer output steams
102 * @param streams vector to store generated stream by flush()
103 */
104 virtual void flush(std::vector<proto::Stream>& streams);
105
106 /**
107 * Get estimated sized of buffer used
108 */
109 virtual uint64_t getEstimatedSize() const;
110
111 /**
112 * Get the encoding used by the writer for this column.
113 * ColumnEncoding info is pushed into the vector
114 */
115 virtual void getColumnEncoding(
116 std::vector<proto::ColumnEncoding>& encodings) const = 0;
117
118 /**
119 * Get the stripe statistics for this column
120 */
121 virtual void getStripeStatistics(
122 std::vector<proto::ColumnStatistics>& stats) const;
123
124 /**
125 * Get the file statistics for this column
126 */
127 virtual void getFileStatistics(
128 std::vector<proto::ColumnStatistics>& stats) const;
129
130 /**
131 * Merge index stats into stripe stats and reset index stats
132 */
133 virtual void mergeRowGroupStatsIntoStripeStats();
134
135 /**
136 * Merge stripe stats into file stats and reset stripe stats
137 */
138 virtual void mergeStripeStatsIntoFileStats();
139
140 /**
141 * Create a row index entry with the previous location and the current
142 * index statistics. Also merges the index statistics into the stripe
143 * statistics before they are cleared. Finally, it records the start of the
144 * next index and ensures all of the children columns also create an entry.
145 */
146 virtual void createRowIndexEntry();
147
148 /**
149 * Write row index streams for this column
150 * @param streams output list of ROW_INDEX streams
151 */
152 virtual void writeIndex(std::vector<proto::Stream> &streams) const;
153
154 /**
155 * Record positions for index
156 *
157 * This function is called by createRowIndexEntry() and ColumnWrtier's
158 * constructor. So base classes do not need to call inherited classes'
159 * recordPosition() function.
160 */
161 virtual void recordPosition() const;
162
163 /**
164 * Reset positions for index
165 */
166 virtual void reset();
167
168 protected:
169 /**
170 * Utility function to translate ColumnStatistics into protobuf form and
171 * add it to output list
172 * @param statsList output list for protobuf stats
173 * @param stats ColumnStatistics to be transformed and added
174 */
175 void getProtoBufStatistics(
176 std::vector<proto::ColumnStatistics>& statsList,
177 const MutableColumnStatistics* stats) const {
178 proto::ColumnStatistics pbStats;
179 stats->toProtoBuf(pbStats);
180 statsList.push_back(pbStats);
181 }
182
183 protected:
184 MemoryPool& memPool;
185 std::unique_ptr<BufferedOutputStream> indexStream;
186 };
187
188 /**
189 * Create a writer for the given type.
190 */
191 std::unique_ptr<ColumnWriter> buildWriter(
192 const Type& type,
193 const StreamsFactory& factory,
194 const WriterOptions& options);
195}
196
197#endif
198