1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_COLUMN_SCANNER_H
19#define PARQUET_COLUMN_SCANNER_H
20
21#include <stdio.h>
22#include <cstdint>
23#include <memory>
24#include <ostream>
25#include <string>
26#include <vector>
27
28#include "parquet/column_reader.h"
29#include "parquet/exception.h"
30#include "parquet/platform.h"
31#include "parquet/schema.h"
32#include "parquet/types.h"
33
34namespace parquet {
35
36static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
37
38class PARQUET_EXPORT Scanner {
39 public:
40 explicit Scanner(std::shared_ptr<ColumnReader> reader,
41 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
42 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
43 : batch_size_(batch_size),
44 level_offset_(0),
45 levels_buffered_(0),
46 value_buffer_(AllocateBuffer(pool)),
47 value_offset_(0),
48 values_buffered_(0),
49 reader_(reader) {
50 def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
51 rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
52 }
53
54 virtual ~Scanner() {}
55
56 static std::shared_ptr<Scanner> Make(
57 std::shared_ptr<ColumnReader> col_reader,
58 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
59 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
60
61 virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0;
62
63 bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
64
65 const ColumnDescriptor* descr() const { return reader_->descr(); }
66
67 int64_t batch_size() const { return batch_size_; }
68
69 void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
70
71 protected:
72 int64_t batch_size_;
73
74 std::vector<int16_t> def_levels_;
75 std::vector<int16_t> rep_levels_;
76 int level_offset_;
77 int levels_buffered_;
78
79 std::shared_ptr<ResizableBuffer> value_buffer_;
80 int value_offset_;
81 int64_t values_buffered_;
82
83 private:
84 std::shared_ptr<ColumnReader> reader_;
85};
86
87template <typename DType>
88class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner {
89 public:
90 typedef typename DType::c_type T;
91
92 explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
93 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
94 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
95 : Scanner(reader, batch_size, pool) {
96 typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
97 int value_byte_size = type_traits<DType::type_num>::value_byte_size;
98 PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
99 values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
100 }
101
102 virtual ~TypedScanner() {}
103
104 bool NextLevels(int16_t* def_level, int16_t* rep_level) {
105 if (level_offset_ == levels_buffered_) {
106 levels_buffered_ = static_cast<int>(
107 typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(),
108 rep_levels_.data(), values_, &values_buffered_));
109
110 value_offset_ = 0;
111 level_offset_ = 0;
112 if (!levels_buffered_) {
113 return false;
114 }
115 }
116 *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
117 *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
118 level_offset_++;
119 return true;
120 }
121
122 bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
123 if (level_offset_ == levels_buffered_) {
124 if (!HasNext()) {
125 // Out of data pages
126 return false;
127 }
128 }
129
130 NextLevels(def_level, rep_level);
131 *is_null = *def_level < descr()->max_definition_level();
132
133 if (*is_null) {
134 return true;
135 }
136
137 if (value_offset_ == values_buffered_) {
138 throw ParquetException("Value was non-null, but has not been buffered");
139 }
140 *val = values_[value_offset_++];
141 return true;
142 }
143
144 // Returns true if there is a next value
145 bool NextValue(T* val, bool* is_null) {
146 if (level_offset_ == levels_buffered_) {
147 if (!HasNext()) {
148 // Out of data pages
149 return false;
150 }
151 }
152
153 // Out of values
154 int16_t def_level = -1;
155 int16_t rep_level = -1;
156 NextLevels(&def_level, &rep_level);
157 *is_null = def_level < descr()->max_definition_level();
158
159 if (*is_null) {
160 return true;
161 }
162
163 if (value_offset_ == values_buffered_) {
164 throw ParquetException("Value was non-null, but has not been buffered");
165 }
166 *val = values_[value_offset_++];
167 return true;
168 }
169
170 virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) {
171 T val;
172 int16_t def_level = -1;
173 int16_t rep_level = -1;
174 bool is_null = false;
175 char buffer[80];
176
177 if (!Next(&val, &def_level, &rep_level, &is_null)) {
178 throw ParquetException("No more values buffered");
179 }
180
181 if (with_levels) {
182 out << " D:" << def_level << " R:" << rep_level << " ";
183 if (!is_null) {
184 out << "V:";
185 }
186 }
187
188 if (is_null) {
189 std::string null_fmt = format_fwf<ByteArrayType>(width);
190 snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
191 } else {
192 FormatValue(&val, buffer, sizeof(buffer), width);
193 }
194 out << buffer;
195 }
196
197 private:
198 // The ownership of this object is expressed through the reader_ variable in the base
199 TypedColumnReader<DType>* typed_reader_;
200
201 inline void FormatValue(void* val, char* buffer, int bufsize, int width);
202
203 T* values_;
204};
205
206template <typename DType>
207inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize,
208 int width) {
209 std::string fmt = format_fwf<DType>(width);
210 snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
211}
212
213template <>
214inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize,
215 int width) {
216 std::string fmt = format_fwf<Int96Type>(width);
217 std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
218 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
219}
220
221template <>
222inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize,
223 int width) {
224 std::string fmt = format_fwf<ByteArrayType>(width);
225 std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
226 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
227}
228
229template <>
230inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize,
231 int width) {
232 std::string fmt = format_fwf<FLBAType>(width);
233 std::string result = FixedLenByteArrayToString(
234 *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
235 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
236}
237
238typedef TypedScanner<BooleanType> BoolScanner;
239typedef TypedScanner<Int32Type> Int32Scanner;
240typedef TypedScanner<Int64Type> Int64Scanner;
241typedef TypedScanner<Int96Type> Int96Scanner;
242typedef TypedScanner<FloatType> FloatScanner;
243typedef TypedScanner<DoubleType> DoubleScanner;
244typedef TypedScanner<ByteArrayType> ByteArrayScanner;
245typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
246
247template <typename RType>
248int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
249 uint8_t* values, int64_t* values_buffered,
250 parquet::ColumnReader* reader) {
251 typedef typename RType::T Type;
252 auto typed_reader = static_cast<RType*>(reader);
253 auto vals = reinterpret_cast<Type*>(&values[0]);
254 return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals,
255 values_buffered);
256}
257
258int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
259 int16_t* rep_levels, uint8_t* values,
260 int64_t* values_buffered,
261 parquet::ColumnReader* reader);
262
263} // namespace parquet
264
265#endif // PARQUET_COLUMN_SCANNER_H
266