| 1 | /** |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, software |
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | * See the License for the specific language governing permissions and |
| 16 | * limitations under the License. |
| 17 | */ |
| 18 | |
| 19 | #ifndef ORC_COLUMN_WRITER_HH |
| 20 | #define ORC_COLUMN_WRITER_HH |
| 21 | |
| 22 | #include "orc/Vector.hh" |
| 23 | |
| 24 | #include "ByteRLE.hh" |
| 25 | #include "Compression.hh" |
| 26 | #include "orc/Exceptions.hh" |
| 27 | #include "Statistics.hh" |
| 28 | |
| 29 | #include "wrap/orc-proto-wrapper.hh" |
| 30 | |
| 31 | namespace orc { |
| 32 | |
| 33 | class StreamsFactory { |
| 34 | public: |
| 35 | virtual ~StreamsFactory(); |
| 36 | |
| 37 | /** |
| 38 | * Get the stream for the given column/kind in this stripe. |
| 39 | * @param kind the kind of the stream |
| 40 | * @return the buffer output stream |
| 41 | */ |
| 42 | virtual std::unique_ptr<BufferedOutputStream> |
| 43 | createStream(proto::Stream_Kind kind) const = 0; |
| 44 | }; |
| 45 | |
| 46 | std::unique_ptr<StreamsFactory> createStreamsFactory( |
| 47 | const WriterOptions& options, |
| 48 | OutputStream * outStream); |
| 49 | |
| 50 | /** |
| 51 | * record stream positions for row index |
| 52 | */ |
| 53 | class RowIndexPositionRecorder : public PositionRecorder { |
| 54 | public: |
| 55 | virtual ~RowIndexPositionRecorder() override; |
| 56 | |
| 57 | RowIndexPositionRecorder(proto::RowIndexEntry& entry): |
| 58 | rowIndexEntry(entry) {} |
| 59 | |
| 60 | virtual void add(uint64_t pos) override { |
| 61 | rowIndexEntry.add_positions(pos); |
| 62 | } |
| 63 | |
| 64 | private: |
| 65 | proto::RowIndexEntry& rowIndexEntry; |
| 66 | }; |
| 67 | |
| 68 | /** |
| 69 | * The interface for writing ORC data types. |
| 70 | */ |
| 71 | class ColumnWriter { |
| 72 | protected: |
| 73 | std::unique_ptr<ByteRleEncoder> notNullEncoder; |
| 74 | uint64_t columnId; |
| 75 | std::unique_ptr<MutableColumnStatistics> colIndexStatistics; |
| 76 | std::unique_ptr<MutableColumnStatistics> colStripeStatistics; |
| 77 | std::unique_ptr<MutableColumnStatistics> colFileStatistics; |
| 78 | |
| 79 | bool enableIndex; |
| 80 | // row index for this column, contains all RowIndexEntries in 1 stripe |
| 81 | std::unique_ptr<proto::RowIndex> rowIndex; |
| 82 | std::unique_ptr<proto::RowIndexEntry> rowIndexEntry; |
| 83 | std::unique_ptr<RowIndexPositionRecorder> rowIndexPosition; |
| 84 | |
| 85 | public: |
| 86 | ColumnWriter(const Type& type, const StreamsFactory& factory, |
| 87 | const WriterOptions& options); |
| 88 | |
| 89 | virtual ~ColumnWriter(); |
| 90 | |
| 91 | /** |
| 92 | * Write the next group of values from this rowBatch. |
| 93 | * @param rowBatch the row batch data to write |
| 94 | * @param offset the starting point of row batch to write |
| 95 | * @param numValues the number of values to write |
| 96 | */ |
| 97 | virtual void add(ColumnVectorBatch& rowBatch, |
| 98 | uint64_t offset, |
| 99 | uint64_t numValues); |
| 100 | /** |
| 101 | * Flush column writer output steams |
| 102 | * @param streams vector to store generated stream by flush() |
| 103 | */ |
| 104 | virtual void flush(std::vector<proto::Stream>& streams); |
| 105 | |
| 106 | /** |
| 107 | * Get estimated sized of buffer used |
| 108 | */ |
| 109 | virtual uint64_t getEstimatedSize() const; |
| 110 | |
| 111 | /** |
| 112 | * Get the encoding used by the writer for this column. |
| 113 | * ColumnEncoding info is pushed into the vector |
| 114 | */ |
| 115 | virtual void getColumnEncoding( |
| 116 | std::vector<proto::ColumnEncoding>& encodings) const = 0; |
| 117 | |
| 118 | /** |
| 119 | * Get the stripe statistics for this column |
| 120 | */ |
| 121 | virtual void getStripeStatistics( |
| 122 | std::vector<proto::ColumnStatistics>& stats) const; |
| 123 | |
| 124 | /** |
| 125 | * Get the file statistics for this column |
| 126 | */ |
| 127 | virtual void getFileStatistics( |
| 128 | std::vector<proto::ColumnStatistics>& stats) const; |
| 129 | |
| 130 | /** |
| 131 | * Merge index stats into stripe stats and reset index stats |
| 132 | */ |
| 133 | virtual void mergeRowGroupStatsIntoStripeStats(); |
| 134 | |
| 135 | /** |
| 136 | * Merge stripe stats into file stats and reset stripe stats |
| 137 | */ |
| 138 | virtual void mergeStripeStatsIntoFileStats(); |
| 139 | |
| 140 | /** |
| 141 | * Create a row index entry with the previous location and the current |
| 142 | * index statistics. Also merges the index statistics into the stripe |
| 143 | * statistics before they are cleared. Finally, it records the start of the |
| 144 | * next index and ensures all of the children columns also create an entry. |
| 145 | */ |
| 146 | virtual void createRowIndexEntry(); |
| 147 | |
| 148 | /** |
| 149 | * Write row index streams for this column |
| 150 | * @param streams output list of ROW_INDEX streams |
| 151 | */ |
| 152 | virtual void writeIndex(std::vector<proto::Stream> &streams) const; |
| 153 | |
| 154 | /** |
| 155 | * Record positions for index |
| 156 | * |
| 157 | * This function is called by createRowIndexEntry() and ColumnWrtier's |
| 158 | * constructor. So base classes do not need to call inherited classes' |
| 159 | * recordPosition() function. |
| 160 | */ |
| 161 | virtual void recordPosition() const; |
| 162 | |
| 163 | /** |
| 164 | * Reset positions for index |
| 165 | */ |
| 166 | virtual void reset(); |
| 167 | |
| 168 | protected: |
| 169 | /** |
| 170 | * Utility function to translate ColumnStatistics into protobuf form and |
| 171 | * add it to output list |
| 172 | * @param statsList output list for protobuf stats |
| 173 | * @param stats ColumnStatistics to be transformed and added |
| 174 | */ |
| 175 | void getProtoBufStatistics( |
| 176 | std::vector<proto::ColumnStatistics>& statsList, |
| 177 | const MutableColumnStatistics* stats) const { |
| 178 | proto::ColumnStatistics pbStats; |
| 179 | stats->toProtoBuf(pbStats); |
| 180 | statsList.push_back(pbStats); |
| 181 | } |
| 182 | |
| 183 | protected: |
| 184 | MemoryPool& memPool; |
| 185 | std::unique_ptr<BufferedOutputStream> indexStream; |
| 186 | }; |
| 187 | |
| 188 | /** |
| 189 | * Create a writer for the given type. |
| 190 | */ |
| 191 | std::unique_ptr<ColumnWriter> buildWriter( |
| 192 | const Type& type, |
| 193 | const StreamsFactory& factory, |
| 194 | const WriterOptions& options); |
| 195 | } |
| 196 | |
| 197 | #endif |
| 198 | |