1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef PARQUET_COLUMN_SCANNER_H |
19 | #define PARQUET_COLUMN_SCANNER_H |
20 | |
21 | #include <stdio.h> |
22 | #include <cstdint> |
23 | #include <memory> |
24 | #include <ostream> |
25 | #include <string> |
26 | #include <vector> |
27 | |
28 | #include "parquet/column_reader.h" |
29 | #include "parquet/exception.h" |
30 | #include "parquet/schema.h" |
31 | #include "parquet/types.h" |
32 | #include "parquet/util/macros.h" |
33 | #include "parquet/util/memory.h" |
34 | #include "parquet/util/visibility.h" |
35 | |
36 | namespace parquet { |
37 | |
38 | static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; |
39 | |
40 | class PARQUET_EXPORT Scanner { |
41 | public: |
42 | explicit Scanner(std::shared_ptr<ColumnReader> reader, |
43 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
44 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
45 | : batch_size_(batch_size), |
46 | level_offset_(0), |
47 | levels_buffered_(0), |
48 | value_buffer_(AllocateBuffer(pool)), |
49 | value_offset_(0), |
50 | values_buffered_(0), |
51 | reader_(reader) { |
52 | def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0); |
53 | rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0); |
54 | } |
55 | |
56 | virtual ~Scanner() {} |
57 | |
58 | static std::shared_ptr<Scanner> Make( |
59 | std::shared_ptr<ColumnReader> col_reader, |
60 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
61 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
62 | |
63 | virtual void PrintNext(std::ostream& out, int width) = 0; |
64 | |
65 | bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } |
66 | |
67 | const ColumnDescriptor* descr() const { return reader_->descr(); } |
68 | |
69 | int64_t batch_size() const { return batch_size_; } |
70 | |
71 | void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } |
72 | |
73 | protected: |
74 | int64_t batch_size_; |
75 | |
76 | std::vector<int16_t> def_levels_; |
77 | std::vector<int16_t> rep_levels_; |
78 | int level_offset_; |
79 | int levels_buffered_; |
80 | |
81 | std::shared_ptr<ResizableBuffer> value_buffer_; |
82 | int value_offset_; |
83 | int64_t values_buffered_; |
84 | |
85 | private: |
86 | std::shared_ptr<ColumnReader> reader_; |
87 | }; |
88 | |
89 | template <typename DType> |
90 | class PARQUET_EXPORT TypedScanner : public Scanner { |
91 | public: |
92 | typedef typename DType::c_type T; |
93 | |
94 | explicit TypedScanner(std::shared_ptr<ColumnReader> reader, |
95 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
96 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
97 | : Scanner(reader, batch_size, pool) { |
98 | typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get()); |
99 | int value_byte_size = type_traits<DType::type_num>::value_byte_size; |
100 | PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); |
101 | values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); |
102 | } |
103 | |
104 | virtual ~TypedScanner() {} |
105 | |
106 | bool NextLevels(int16_t* def_level, int16_t* rep_level) { |
107 | if (level_offset_ == levels_buffered_) { |
108 | levels_buffered_ = static_cast<int>( |
109 | typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), |
110 | rep_levels_.data(), values_, &values_buffered_)); |
111 | |
112 | value_offset_ = 0; |
113 | level_offset_ = 0; |
114 | if (!levels_buffered_) { |
115 | return false; |
116 | } |
117 | } |
118 | *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; |
119 | *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; |
120 | level_offset_++; |
121 | return true; |
122 | } |
123 | |
124 | bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { |
125 | if (level_offset_ == levels_buffered_) { |
126 | if (!HasNext()) { |
127 | // Out of data pages |
128 | return false; |
129 | } |
130 | } |
131 | |
132 | NextLevels(def_level, rep_level); |
133 | *is_null = *def_level < descr()->max_definition_level(); |
134 | |
135 | if (*is_null) { |
136 | return true; |
137 | } |
138 | |
139 | if (value_offset_ == values_buffered_) { |
140 | throw ParquetException("Value was non-null, but has not been buffered" ); |
141 | } |
142 | *val = values_[value_offset_++]; |
143 | return true; |
144 | } |
145 | |
146 | // Returns true if there is a next value |
147 | bool NextValue(T* val, bool* is_null) { |
148 | if (level_offset_ == levels_buffered_) { |
149 | if (!HasNext()) { |
150 | // Out of data pages |
151 | return false; |
152 | } |
153 | } |
154 | |
155 | // Out of values |
156 | int16_t def_level = -1; |
157 | int16_t rep_level = -1; |
158 | NextLevels(&def_level, &rep_level); |
159 | *is_null = def_level < descr()->max_definition_level(); |
160 | |
161 | if (*is_null) { |
162 | return true; |
163 | } |
164 | |
165 | if (value_offset_ == values_buffered_) { |
166 | throw ParquetException("Value was non-null, but has not been buffered" ); |
167 | } |
168 | *val = values_[value_offset_++]; |
169 | return true; |
170 | } |
171 | |
172 | virtual void PrintNext(std::ostream& out, int width) { |
173 | T val; |
174 | bool is_null = false; |
175 | char buffer[25]; |
176 | |
177 | if (!NextValue(&val, &is_null)) { |
178 | throw ParquetException("No more values buffered" ); |
179 | } |
180 | |
181 | if (is_null) { |
182 | std::string null_fmt = format_fwf<ByteArrayType>(width); |
183 | snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL" ); |
184 | } else { |
185 | FormatValue(&val, buffer, sizeof(buffer), width); |
186 | } |
187 | out << buffer; |
188 | } |
189 | |
190 | private: |
191 | // The ownership of this object is expressed through the reader_ variable in the base |
192 | TypedColumnReader<DType>* typed_reader_; |
193 | |
194 | inline void FormatValue(void* val, char* buffer, int bufsize, int width); |
195 | |
196 | T* values_; |
197 | }; |
198 | |
199 | template <typename DType> |
200 | inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, |
201 | int width) { |
202 | std::string fmt = format_fwf<DType>(width); |
203 | snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); |
204 | } |
205 | |
206 | template <> |
207 | inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, |
208 | int width) { |
209 | std::string fmt = format_fwf<Int96Type>(width); |
210 | std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); |
211 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
212 | } |
213 | |
214 | template <> |
215 | inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, |
216 | int width) { |
217 | std::string fmt = format_fwf<ByteArrayType>(width); |
218 | std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); |
219 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
220 | } |
221 | |
222 | template <> |
223 | inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, |
224 | int width) { |
225 | std::string fmt = format_fwf<FLBAType>(width); |
226 | std::string result = FixedLenByteArrayToString( |
227 | *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); |
228 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
229 | } |
230 | |
231 | typedef TypedScanner<BooleanType> BoolScanner; |
232 | typedef TypedScanner<Int32Type> Int32Scanner; |
233 | typedef TypedScanner<Int64Type> Int64Scanner; |
234 | typedef TypedScanner<Int96Type> Int96Scanner; |
235 | typedef TypedScanner<FloatType> FloatScanner; |
236 | typedef TypedScanner<DoubleType> DoubleScanner; |
237 | typedef TypedScanner<ByteArrayType> ByteArrayScanner; |
238 | typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; |
239 | |
240 | template <typename RType> |
241 | int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
242 | uint8_t* values, int64_t* values_buffered, |
243 | parquet::ColumnReader* reader) { |
244 | typedef typename RType::T Type; |
245 | auto typed_reader = static_cast<RType*>(reader); |
246 | auto vals = reinterpret_cast<Type*>(&values[0]); |
247 | return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, |
248 | values_buffered); |
249 | } |
250 | |
251 | int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, |
252 | int16_t* rep_levels, uint8_t* values, |
253 | int64_t* values_buffered, |
254 | parquet::ColumnReader* reader); |
255 | |
256 | } // namespace parquet |
257 | |
258 | #endif // PARQUET_COLUMN_SCANNER_H |
259 | |