1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_COLUMN_SCANNER_H
19#define PARQUET_COLUMN_SCANNER_H
20
21#include <stdio.h>
22#include <cstdint>
23#include <memory>
24#include <ostream>
25#include <string>
26#include <vector>
27
28#include "parquet/column_reader.h"
29#include "parquet/exception.h"
30#include "parquet/schema.h"
31#include "parquet/types.h"
32#include "parquet/util/macros.h"
33#include "parquet/util/memory.h"
34#include "parquet/util/visibility.h"
35
36namespace parquet {
37
38static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
39
40class PARQUET_EXPORT Scanner {
41 public:
42 explicit Scanner(std::shared_ptr<ColumnReader> reader,
43 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
44 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
45 : batch_size_(batch_size),
46 level_offset_(0),
47 levels_buffered_(0),
48 value_buffer_(AllocateBuffer(pool)),
49 value_offset_(0),
50 values_buffered_(0),
51 reader_(reader) {
52 def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
53 rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
54 }
55
56 virtual ~Scanner() {}
57
58 static std::shared_ptr<Scanner> Make(
59 std::shared_ptr<ColumnReader> col_reader,
60 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
61 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
62
63 virtual void PrintNext(std::ostream& out, int width) = 0;
64
65 bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
66
67 const ColumnDescriptor* descr() const { return reader_->descr(); }
68
69 int64_t batch_size() const { return batch_size_; }
70
71 void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
72
73 protected:
74 int64_t batch_size_;
75
76 std::vector<int16_t> def_levels_;
77 std::vector<int16_t> rep_levels_;
78 int level_offset_;
79 int levels_buffered_;
80
81 std::shared_ptr<ResizableBuffer> value_buffer_;
82 int value_offset_;
83 int64_t values_buffered_;
84
85 private:
86 std::shared_ptr<ColumnReader> reader_;
87};
88
89template <typename DType>
90class PARQUET_EXPORT TypedScanner : public Scanner {
91 public:
92 typedef typename DType::c_type T;
93
94 explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
95 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
96 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
97 : Scanner(reader, batch_size, pool) {
98 typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
99 int value_byte_size = type_traits<DType::type_num>::value_byte_size;
100 PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
101 values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
102 }
103
104 virtual ~TypedScanner() {}
105
106 bool NextLevels(int16_t* def_level, int16_t* rep_level) {
107 if (level_offset_ == levels_buffered_) {
108 levels_buffered_ = static_cast<int>(
109 typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(),
110 rep_levels_.data(), values_, &values_buffered_));
111
112 value_offset_ = 0;
113 level_offset_ = 0;
114 if (!levels_buffered_) {
115 return false;
116 }
117 }
118 *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
119 *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
120 level_offset_++;
121 return true;
122 }
123
124 bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
125 if (level_offset_ == levels_buffered_) {
126 if (!HasNext()) {
127 // Out of data pages
128 return false;
129 }
130 }
131
132 NextLevels(def_level, rep_level);
133 *is_null = *def_level < descr()->max_definition_level();
134
135 if (*is_null) {
136 return true;
137 }
138
139 if (value_offset_ == values_buffered_) {
140 throw ParquetException("Value was non-null, but has not been buffered");
141 }
142 *val = values_[value_offset_++];
143 return true;
144 }
145
146 // Returns true if there is a next value
147 bool NextValue(T* val, bool* is_null) {
148 if (level_offset_ == levels_buffered_) {
149 if (!HasNext()) {
150 // Out of data pages
151 return false;
152 }
153 }
154
155 // Out of values
156 int16_t def_level = -1;
157 int16_t rep_level = -1;
158 NextLevels(&def_level, &rep_level);
159 *is_null = def_level < descr()->max_definition_level();
160
161 if (*is_null) {
162 return true;
163 }
164
165 if (value_offset_ == values_buffered_) {
166 throw ParquetException("Value was non-null, but has not been buffered");
167 }
168 *val = values_[value_offset_++];
169 return true;
170 }
171
172 virtual void PrintNext(std::ostream& out, int width) {
173 T val;
174 bool is_null = false;
175 char buffer[25];
176
177 if (!NextValue(&val, &is_null)) {
178 throw ParquetException("No more values buffered");
179 }
180
181 if (is_null) {
182 std::string null_fmt = format_fwf<ByteArrayType>(width);
183 snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
184 } else {
185 FormatValue(&val, buffer, sizeof(buffer), width);
186 }
187 out << buffer;
188 }
189
190 private:
191 // The ownership of this object is expressed through the reader_ variable in the base
192 TypedColumnReader<DType>* typed_reader_;
193
194 inline void FormatValue(void* val, char* buffer, int bufsize, int width);
195
196 T* values_;
197};
198
199template <typename DType>
200inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize,
201 int width) {
202 std::string fmt = format_fwf<DType>(width);
203 snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
204}
205
206template <>
207inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize,
208 int width) {
209 std::string fmt = format_fwf<Int96Type>(width);
210 std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
211 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
212}
213
214template <>
215inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize,
216 int width) {
217 std::string fmt = format_fwf<ByteArrayType>(width);
218 std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
219 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
220}
221
222template <>
223inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize,
224 int width) {
225 std::string fmt = format_fwf<FLBAType>(width);
226 std::string result = FixedLenByteArrayToString(
227 *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
228 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
229}
230
231typedef TypedScanner<BooleanType> BoolScanner;
232typedef TypedScanner<Int32Type> Int32Scanner;
233typedef TypedScanner<Int64Type> Int64Scanner;
234typedef TypedScanner<Int96Type> Int96Scanner;
235typedef TypedScanner<FloatType> FloatScanner;
236typedef TypedScanner<DoubleType> DoubleScanner;
237typedef TypedScanner<ByteArrayType> ByteArrayScanner;
238typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
239
240template <typename RType>
241int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
242 uint8_t* values, int64_t* values_buffered,
243 parquet::ColumnReader* reader) {
244 typedef typename RType::T Type;
245 auto typed_reader = static_cast<RType*>(reader);
246 auto vals = reinterpret_cast<Type*>(&values[0]);
247 return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals,
248 values_buffered);
249}
250
251int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
252 int16_t* rep_levels, uint8_t* values,
253 int64_t* values_buffered,
254 parquet::ColumnReader* reader);
255
256} // namespace parquet
257
258#endif // PARQUET_COLUMN_SCANNER_H
259