| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #ifndef PARQUET_COLUMN_SCANNER_H |
| 19 | #define PARQUET_COLUMN_SCANNER_H |
| 20 | |
| 21 | #include <stdio.h> |
| 22 | #include <cstdint> |
| 23 | #include <memory> |
| 24 | #include <ostream> |
| 25 | #include <string> |
| 26 | #include <vector> |
| 27 | |
| 28 | #include "parquet/column_reader.h" |
| 29 | #include "parquet/exception.h" |
| 30 | #include "parquet/platform.h" |
| 31 | #include "parquet/schema.h" |
| 32 | #include "parquet/types.h" |
| 33 | |
| 34 | namespace parquet { |
| 35 | |
| 36 | static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; |
| 37 | |
| 38 | class PARQUET_EXPORT Scanner { |
| 39 | public: |
| 40 | explicit Scanner(std::shared_ptr<ColumnReader> reader, |
| 41 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
| 42 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
| 43 | : batch_size_(batch_size), |
| 44 | level_offset_(0), |
| 45 | levels_buffered_(0), |
| 46 | value_buffer_(AllocateBuffer(pool)), |
| 47 | value_offset_(0), |
| 48 | values_buffered_(0), |
| 49 | reader_(reader) { |
| 50 | def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0); |
| 51 | rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0); |
| 52 | } |
| 53 | |
| 54 | virtual ~Scanner() {} |
| 55 | |
| 56 | static std::shared_ptr<Scanner> Make( |
| 57 | std::shared_ptr<ColumnReader> col_reader, |
| 58 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
| 59 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
| 60 | |
| 61 | virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0; |
| 62 | |
| 63 | bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } |
| 64 | |
| 65 | const ColumnDescriptor* descr() const { return reader_->descr(); } |
| 66 | |
| 67 | int64_t batch_size() const { return batch_size_; } |
| 68 | |
| 69 | void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } |
| 70 | |
| 71 | protected: |
| 72 | int64_t batch_size_; |
| 73 | |
| 74 | std::vector<int16_t> def_levels_; |
| 75 | std::vector<int16_t> rep_levels_; |
| 76 | int level_offset_; |
| 77 | int levels_buffered_; |
| 78 | |
| 79 | std::shared_ptr<ResizableBuffer> value_buffer_; |
| 80 | int value_offset_; |
| 81 | int64_t values_buffered_; |
| 82 | |
| 83 | private: |
| 84 | std::shared_ptr<ColumnReader> reader_; |
| 85 | }; |
| 86 | |
| 87 | template <typename DType> |
| 88 | class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { |
| 89 | public: |
| 90 | typedef typename DType::c_type T; |
| 91 | |
| 92 | explicit TypedScanner(std::shared_ptr<ColumnReader> reader, |
| 93 | int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
| 94 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
| 95 | : Scanner(reader, batch_size, pool) { |
| 96 | typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get()); |
| 97 | int value_byte_size = type_traits<DType::type_num>::value_byte_size; |
| 98 | PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); |
| 99 | values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); |
| 100 | } |
| 101 | |
| 102 | virtual ~TypedScanner() {} |
| 103 | |
| 104 | bool NextLevels(int16_t* def_level, int16_t* rep_level) { |
| 105 | if (level_offset_ == levels_buffered_) { |
| 106 | levels_buffered_ = static_cast<int>( |
| 107 | typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), |
| 108 | rep_levels_.data(), values_, &values_buffered_)); |
| 109 | |
| 110 | value_offset_ = 0; |
| 111 | level_offset_ = 0; |
| 112 | if (!levels_buffered_) { |
| 113 | return false; |
| 114 | } |
| 115 | } |
| 116 | *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; |
| 117 | *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; |
| 118 | level_offset_++; |
| 119 | return true; |
| 120 | } |
| 121 | |
| 122 | bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { |
| 123 | if (level_offset_ == levels_buffered_) { |
| 124 | if (!HasNext()) { |
| 125 | // Out of data pages |
| 126 | return false; |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | NextLevels(def_level, rep_level); |
| 131 | *is_null = *def_level < descr()->max_definition_level(); |
| 132 | |
| 133 | if (*is_null) { |
| 134 | return true; |
| 135 | } |
| 136 | |
| 137 | if (value_offset_ == values_buffered_) { |
| 138 | throw ParquetException("Value was non-null, but has not been buffered" ); |
| 139 | } |
| 140 | *val = values_[value_offset_++]; |
| 141 | return true; |
| 142 | } |
| 143 | |
| 144 | // Returns true if there is a next value |
| 145 | bool NextValue(T* val, bool* is_null) { |
| 146 | if (level_offset_ == levels_buffered_) { |
| 147 | if (!HasNext()) { |
| 148 | // Out of data pages |
| 149 | return false; |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | // Out of values |
| 154 | int16_t def_level = -1; |
| 155 | int16_t rep_level = -1; |
| 156 | NextLevels(&def_level, &rep_level); |
| 157 | *is_null = def_level < descr()->max_definition_level(); |
| 158 | |
| 159 | if (*is_null) { |
| 160 | return true; |
| 161 | } |
| 162 | |
| 163 | if (value_offset_ == values_buffered_) { |
| 164 | throw ParquetException("Value was non-null, but has not been buffered" ); |
| 165 | } |
| 166 | *val = values_[value_offset_++]; |
| 167 | return true; |
| 168 | } |
| 169 | |
| 170 | virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) { |
| 171 | T val; |
| 172 | int16_t def_level = -1; |
| 173 | int16_t rep_level = -1; |
| 174 | bool is_null = false; |
| 175 | char buffer[80]; |
| 176 | |
| 177 | if (!Next(&val, &def_level, &rep_level, &is_null)) { |
| 178 | throw ParquetException("No more values buffered" ); |
| 179 | } |
| 180 | |
| 181 | if (with_levels) { |
| 182 | out << " D:" << def_level << " R:" << rep_level << " " ; |
| 183 | if (!is_null) { |
| 184 | out << "V:" ; |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | if (is_null) { |
| 189 | std::string null_fmt = format_fwf<ByteArrayType>(width); |
| 190 | snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL" ); |
| 191 | } else { |
| 192 | FormatValue(&val, buffer, sizeof(buffer), width); |
| 193 | } |
| 194 | out << buffer; |
| 195 | } |
| 196 | |
| 197 | private: |
| 198 | // The ownership of this object is expressed through the reader_ variable in the base |
| 199 | TypedColumnReader<DType>* typed_reader_; |
| 200 | |
| 201 | inline void FormatValue(void* val, char* buffer, int bufsize, int width); |
| 202 | |
| 203 | T* values_; |
| 204 | }; |
| 205 | |
| 206 | template <typename DType> |
| 207 | inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, |
| 208 | int width) { |
| 209 | std::string fmt = format_fwf<DType>(width); |
| 210 | snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); |
| 211 | } |
| 212 | |
| 213 | template <> |
| 214 | inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, |
| 215 | int width) { |
| 216 | std::string fmt = format_fwf<Int96Type>(width); |
| 217 | std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); |
| 218 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
| 219 | } |
| 220 | |
| 221 | template <> |
| 222 | inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, |
| 223 | int width) { |
| 224 | std::string fmt = format_fwf<ByteArrayType>(width); |
| 225 | std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); |
| 226 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
| 227 | } |
| 228 | |
| 229 | template <> |
| 230 | inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, |
| 231 | int width) { |
| 232 | std::string fmt = format_fwf<FLBAType>(width); |
| 233 | std::string result = FixedLenByteArrayToString( |
| 234 | *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); |
| 235 | snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
| 236 | } |
| 237 | |
| 238 | typedef TypedScanner<BooleanType> BoolScanner; |
| 239 | typedef TypedScanner<Int32Type> Int32Scanner; |
| 240 | typedef TypedScanner<Int64Type> Int64Scanner; |
| 241 | typedef TypedScanner<Int96Type> Int96Scanner; |
| 242 | typedef TypedScanner<FloatType> FloatScanner; |
| 243 | typedef TypedScanner<DoubleType> DoubleScanner; |
| 244 | typedef TypedScanner<ByteArrayType> ByteArrayScanner; |
| 245 | typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; |
| 246 | |
| 247 | template <typename RType> |
| 248 | int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
| 249 | uint8_t* values, int64_t* values_buffered, |
| 250 | parquet::ColumnReader* reader) { |
| 251 | typedef typename RType::T Type; |
| 252 | auto typed_reader = static_cast<RType*>(reader); |
| 253 | auto vals = reinterpret_cast<Type*>(&values[0]); |
| 254 | return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, |
| 255 | values_buffered); |
| 256 | } |
| 257 | |
| 258 | int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, |
| 259 | int16_t* rep_levels, uint8_t* values, |
| 260 | int64_t* values_buffered, |
| 261 | parquet::ColumnReader* reader); |
| 262 | |
| 263 | } // namespace parquet |
| 264 | |
| 265 | #endif // PARQUET_COLUMN_SCANNER_H |
| 266 | |