1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef PARQUET_COLUMN_SCANNER_H |
19 | #define PARQUET_COLUMN_SCANNER_H |
20 | |
21 | #include <stdio.h> |
22 | #include <cstdint> |
23 | #include <memory> |
24 | #include <ostream> |
25 | #include <string> |
26 | #include <vector> |
27 | |
28 | #include "parquet/column_reader.h" |
29 | #include "parquet/exception.h" |
30 | #include "parquet/platform.h" |
31 | #include "parquet/schema.h" |
32 | #include "parquet/types.h" |
33 | |
34 | namespace parquet { |
35 | |
36 | static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; |
37 | |
38 | class PARQUET_EXPORT Scanner { |
39 | public: |
40 | explicit Scanner(std::shared_ptr<ColumnReader> reader, |
41 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
42 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
43 | : batch_size_(batch_size), |
44 | level_offset_(0), |
45 | levels_buffered_(0), |
46 | value_buffer_(AllocateBuffer(pool)), |
47 | value_offset_(0), |
48 | values_buffered_(0), |
49 | reader_(reader) { |
50 | def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0); |
51 | rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0); |
52 | } |
53 | |
54 | virtual ~Scanner() {} |
55 | |
56 | static std::shared_ptr<Scanner> Make( |
57 | std::shared_ptr<ColumnReader> col_reader, |
58 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
59 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
60 | |
61 | virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0; |
62 | |
63 | bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } |
64 | |
65 | const ColumnDescriptor* descr() const { return reader_->descr(); } |
66 | |
67 | int64_t batch_size() const { return batch_size_; } |
68 | |
69 | void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } |
70 | |
71 | protected: |
72 | int64_t batch_size_; |
73 | |
74 | std::vector<int16_t> def_levels_; |
75 | std::vector<int16_t> rep_levels_; |
76 | int level_offset_; |
77 | int levels_buffered_; |
78 | |
79 | std::shared_ptr<ResizableBuffer> value_buffer_; |
80 | int value_offset_; |
81 | int64_t values_buffered_; |
82 | |
83 | private: |
84 | std::shared_ptr<ColumnReader> reader_; |
85 | }; |
86 | |
87 | template <typename DType> |
88 | class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { |
89 | public: |
90 | typedef typename DType::c_type T; |
91 | |
92 | explicit TypedScanner(std::shared_ptr<ColumnReader> reader, |
93 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
94 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
95 | : Scanner(reader, batch_size, pool) { |
96 | typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get()); |
97 | int value_byte_size = type_traits<DType::type_num>::value_byte_size; |
98 | PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); |
99 | values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); |
100 | } |
101 | |
102 | virtual ~TypedScanner() {} |
103 | |
104 | bool NextLevels(int16_t* def_level, int16_t* rep_level) { |
105 | if (level_offset_ == levels_buffered_) { |
106 | levels_buffered_ = static_cast<int>( |
107 | typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), |
108 | rep_levels_.data(), values_, &values_buffered_)); |
109 | |
110 | value_offset_ = 0; |
111 | level_offset_ = 0; |
112 | if (!levels_buffered_) { |
113 | return false; |
114 | } |
115 | } |
116 | *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; |
117 | *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; |
118 | level_offset_++; |
119 | return true; |
120 | } |
121 | |
122 | bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { |
123 | if (level_offset_ == levels_buffered_) { |
124 | if (!HasNext()) { |
125 | // Out of data pages |
126 | return false; |
127 | } |
128 | } |
129 | |
130 | NextLevels(def_level, rep_level); |
131 | *is_null = *def_level < descr()->max_definition_level(); |
132 | |
133 | if (*is_null) { |
134 | return true; |
135 | } |
136 | |
137 | if (value_offset_ == values_buffered_) { |
138 | throw ParquetException("Value was non-null, but has not been buffered" ); |
139 | } |
140 | *val = values_[value_offset_++]; |
141 | return true; |
142 | } |
143 | |
144 | // Returns true if there is a next value |
145 | bool NextValue(T* val, bool* is_null) { |
146 | if (level_offset_ == levels_buffered_) { |
147 | if (!HasNext()) { |
148 | // Out of data pages |
149 | return false; |
150 | } |
151 | } |
152 | |
153 | // Out of values |
154 | int16_t def_level = -1; |
155 | int16_t rep_level = -1; |
156 | NextLevels(&def_level, &rep_level); |
157 | *is_null = def_level < descr()->max_definition_level(); |
158 | |
159 | if (*is_null) { |
160 | return true; |
161 | } |
162 | |
163 | if (value_offset_ == values_buffered_) { |
164 | throw ParquetException("Value was non-null, but has not been buffered" ); |
165 | } |
166 | *val = values_[value_offset_++]; |
167 | return true; |
168 | } |
169 | |
170 | virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) { |
171 | T val; |
172 | int16_t def_level = -1; |
173 | int16_t rep_level = -1; |
174 | bool is_null = false; |
175 | char buffer[80]; |
176 | |
177 | if (!Next(&val, &def_level, &rep_level, &is_null)) { |
178 | throw ParquetException("No more values buffered" ); |
179 | } |
180 | |
181 | if (with_levels) { |
182 | out << " D:" << def_level << " R:" << rep_level << " " ; |
183 | if (!is_null) { |
184 | out << "V:" ; |
185 | } |
186 | } |
187 | |
188 | if (is_null) { |
189 | std::string null_fmt = format_fwf<ByteArrayType>(width); |
190 | snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL" ); |
191 | } else { |
192 | FormatValue(&val, buffer, sizeof(buffer), width); |
193 | } |
194 | out << buffer; |
195 | } |
196 | |
197 | private: |
198 | // The ownership of this object is expressed through the reader_ variable in the base |
199 | TypedColumnReader<DType>* typed_reader_; |
200 | |
201 | inline void FormatValue(void* val, char* buffer, int bufsize, int width); |
202 | |
203 | T* values_; |
204 | }; |
205 | |
206 | template <typename DType> |
207 | inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, |
208 | int width) { |
209 | std::string fmt = format_fwf<DType>(width); |
210 | snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); |
211 | } |
212 | |
213 | template <> |
214 | inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, |
215 | int width) { |
216 | std::string fmt = format_fwf<Int96Type>(width); |
217 | std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); |
218 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
219 | } |
220 | |
221 | template <> |
222 | inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, |
223 | int width) { |
224 | std::string fmt = format_fwf<ByteArrayType>(width); |
225 | std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); |
226 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
227 | } |
228 | |
229 | template <> |
230 | inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, |
231 | int width) { |
232 | std::string fmt = format_fwf<FLBAType>(width); |
233 | std::string result = FixedLenByteArrayToString( |
234 | *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); |
235 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
236 | } |
237 | |
238 | typedef TypedScanner<BooleanType> BoolScanner; |
239 | typedef TypedScanner<Int32Type> Int32Scanner; |
240 | typedef TypedScanner<Int64Type> Int64Scanner; |
241 | typedef TypedScanner<Int96Type> Int96Scanner; |
242 | typedef TypedScanner<FloatType> FloatScanner; |
243 | typedef TypedScanner<DoubleType> DoubleScanner; |
244 | typedef TypedScanner<ByteArrayType> ByteArrayScanner; |
245 | typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; |
246 | |
247 | template <typename RType> |
248 | int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
249 | uint8_t* values, int64_t* values_buffered, |
250 | parquet::ColumnReader* reader) { |
251 | typedef typename RType::T Type; |
252 | auto typed_reader = static_cast<RType*>(reader); |
253 | auto vals = reinterpret_cast<Type*>(&values[0]); |
254 | return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, |
255 | values_buffered); |
256 | } |
257 | |
258 | int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, |
259 | int16_t* rep_levels, uint8_t* values, |
260 | int64_t* values_buffered, |
261 | parquet::ColumnReader* reader); |
262 | |
263 | } // namespace parquet |
264 | |
265 | #endif // PARQUET_COLUMN_SCANNER_H |
266 | |