| 1 | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 | // or more contributor license agreements.  See the NOTICE file | 
| 3 | // distributed with this work for additional information | 
| 4 | // regarding copyright ownership.  The ASF licenses this file | 
| 5 | // to you under the Apache License, Version 2.0 (the | 
| 6 | // "License"); you may not use this file except in compliance | 
| 7 | // with the License.  You may obtain a copy of the License at | 
| 8 | // | 
| 9 | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 | // | 
| 11 | // Unless required by applicable law or agreed to in writing, | 
| 12 | // software distributed under the License is distributed on an | 
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 | // KIND, either express or implied.  See the License for the | 
| 15 | // specific language governing permissions and limitations | 
| 16 | // under the License. | 
| 17 |  | 
| 18 | #pragma once | 
| 19 |  | 
| 20 | #include <cstdint> | 
| 21 | #include <memory> | 
| 22 | #include <vector> | 
| 23 |  | 
| 24 | #include "parquet/exception.h" | 
| 25 | #include "parquet/platform.h" | 
| 26 | #include "parquet/schema.h" | 
| 27 | #include "parquet/types.h" | 
| 28 |  | 
| 29 | namespace arrow { | 
| 30 |  | 
| 31 | class Array; | 
| 32 | class ChunkedArray; | 
| 33 |  | 
| 34 | namespace BitUtil { | 
| 35 | class BitReader; | 
| 36 | }  // namespace BitUtil | 
| 37 |  | 
| 38 | namespace util { | 
| 39 | class RleDecoder; | 
| 40 | }  // namespace util | 
| 41 |  | 
| 42 | }  // namespace arrow | 
| 43 |  | 
| 44 | namespace parquet { | 
| 45 |  | 
| 46 | class Page; | 
| 47 |  | 
| 48 | // 16 MB is the default maximum page header size | 
| 49 | static constexpr uint32_t  = 16 * 1024 * 1024; | 
| 50 |  | 
| 51 | // 16 KB is the default expected page header size | 
| 52 | static constexpr uint32_t  = 16 * 1024; | 
| 53 |  | 
| 54 | class PARQUET_EXPORT LevelDecoder { | 
| 55 |  public: | 
| 56 |   LevelDecoder(); | 
| 57 |   ~LevelDecoder(); | 
| 58 |  | 
| 59 |   // Initialize the LevelDecoder state with new data | 
| 60 |   // and return the number of bytes consumed | 
| 61 |   int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, | 
| 62 |               const uint8_t* data); | 
| 63 |  | 
| 64 |   // Decodes a batch of levels into an array and returns the number of levels decoded | 
| 65 |   int Decode(int batch_size, int16_t* levels); | 
| 66 |  | 
| 67 |  private: | 
| 68 |   int bit_width_; | 
| 69 |   int num_values_remaining_; | 
| 70 |   Encoding::type encoding_; | 
| 71 |   std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_; | 
| 72 |   std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_; | 
| 73 | }; | 
| 74 |  | 
| 75 | // Abstract page iterator interface. This way, we can feed column pages to the | 
| 76 | // ColumnReader through whatever mechanism we choose | 
| 77 | class PARQUET_EXPORT  { | 
| 78 |  public: | 
| 79 |   virtual () = default; | 
| 80 |  | 
| 81 |   static std::unique_ptr<PageReader> ( | 
| 82 |       const std::shared_ptr<ArrowInputStream>& stream, int64_t total_num_rows, | 
| 83 |       Compression::type codec, | 
| 84 |       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); | 
| 85 |  | 
| 86 |   // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> | 
| 87 |   // containing new Page otherwise | 
| 88 |   virtual std::shared_ptr<Page> () = 0; | 
| 89 |  | 
| 90 |   virtual void (uint32_t size) = 0; | 
| 91 | }; | 
| 92 |  | 
| 93 | class PARQUET_EXPORT ColumnReader { | 
| 94 |  public: | 
| 95 |   virtual ~ColumnReader() = default; | 
| 96 |  | 
| 97 |   static std::shared_ptr<ColumnReader> Make( | 
| 98 |       const ColumnDescriptor* descr, std::unique_ptr<PageReader> , | 
| 99 |       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); | 
| 100 |  | 
| 101 |   // Returns true if there are still values in this column. | 
| 102 |   virtual bool HasNext() = 0; | 
| 103 |  | 
| 104 |   virtual Type::type type() const = 0; | 
| 105 |  | 
| 106 |   virtual const ColumnDescriptor* descr() const = 0; | 
| 107 | }; | 
| 108 |  | 
| 109 | // API to read values from a single column. This is a main client facing API. | 
| 110 | template <typename DType> | 
| 111 | class TypedColumnReader : public ColumnReader { | 
| 112 |  public: | 
| 113 |   typedef typename DType::c_type T; | 
| 114 |  | 
| 115 |   // Read a batch of repetition levels, definition levels, and values from the | 
| 116 |   // column. | 
| 117 |   // | 
| 118 |   // Since null values are not stored in the values, the number of values read | 
| 119 |   // may be less than the number of repetition and definition levels. With | 
| 120 |   // nested data this is almost certainly true. | 
| 121 |   // | 
| 122 |   // Set def_levels or rep_levels to nullptr if you want to skip reading them. | 
| 123 |   // This is only safe if you know through some other source that there are no | 
| 124 |   // undefined values. | 
| 125 |   // | 
| 126 |   // To fully exhaust a row group, you must read batches until the number of | 
| 127 |   // values read reaches the number of stored values according to the metadata. | 
| 128 |   // | 
| 129 |   // This API is the same for both V1 and V2 of the DataPage | 
| 130 |   // | 
| 131 |   // @returns: actual number of levels read (see values_read for number of values read) | 
| 132 |   virtual int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, | 
| 133 |                             T* values, int64_t* values_read) = 0; | 
| 134 |  | 
| 135 |   /// Read a batch of repetition levels, definition levels, and values from the | 
| 136 |   /// column and leave spaces for null entries on the lowest level in the values | 
| 137 |   /// buffer. | 
| 138 |   /// | 
| 139 |   /// In comparision to ReadBatch the length of repetition and definition levels | 
| 140 |   /// is the same as of the number of values read for max_definition_level == 1. | 
| 141 |   /// In the case of max_definition_level > 1, the repetition and definition | 
| 142 |   /// levels are larger than the values but the values include the null entries | 
| 143 |   /// with definition_level == (max_definition_level - 1). | 
| 144 |   /// | 
| 145 |   /// To fully exhaust a row group, you must read batches until the number of | 
| 146 |   /// values read reaches the number of stored values according to the metadata. | 
| 147 |   /// | 
| 148 |   /// @param batch_size the number of levels to read | 
| 149 |   /// @param[out] def_levels The Parquet definition levels, output has | 
| 150 |   ///   the length levels_read. | 
| 151 |   /// @param[out] rep_levels The Parquet repetition levels, output has | 
| 152 |   ///   the length levels_read. | 
| 153 |   /// @param[out] values The values in the lowest nested level including | 
| 154 |   ///   spacing for nulls on the lowest levels; output has the length | 
| 155 |   ///   values_read. | 
| 156 |   /// @param[out] valid_bits Memory allocated for a bitmap that indicates if | 
| 157 |   ///   the row is null or on the maximum definition level. For performance | 
| 158 |   ///   reasons the underlying buffer should be able to store 1 bit more than | 
| 159 |   ///   required. If this requires an additional byte, this byte is only read | 
| 160 |   ///   but never written to. | 
| 161 |   /// @param valid_bits_offset The offset in bits of the valid_bits where the | 
| 162 |   ///   first relevant bit resides. | 
| 163 |   /// @param[out] levels_read The number of repetition/definition levels that were read. | 
| 164 |   /// @param[out] values_read The number of values read, this includes all | 
| 165 |   ///   non-null entries as well as all null-entries on the lowest level | 
| 166 |   ///   (i.e. definition_level == max_definition_level - 1) | 
| 167 |   /// @param[out] null_count The number of nulls on the lowest levels. | 
| 168 |   ///   (i.e. (values_read - null_count) is total number of non-null entries) | 
| 169 |   virtual int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, | 
| 170 |                                   int16_t* rep_levels, T* values, uint8_t* valid_bits, | 
| 171 |                                   int64_t valid_bits_offset, int64_t* levels_read, | 
| 172 |                                   int64_t* values_read, int64_t* null_count) = 0; | 
| 173 |  | 
| 174 |   // Skip reading levels | 
| 175 |   // Returns the number of levels skipped | 
| 176 |   virtual int64_t Skip(int64_t num_rows_to_skip) = 0; | 
| 177 | }; | 
| 178 |  | 
| 179 | namespace internal { | 
| 180 |  | 
| 181 | /// \brief Stateful column reader that delimits semantic records for both flat | 
| 182 | /// and nested columns | 
| 183 | /// | 
| 184 | /// \note API EXPERIMENTAL | 
| 185 | /// \since 1.3.0 | 
| 186 | class RecordReader { | 
| 187 |  public: | 
| 188 |   static std::shared_ptr<RecordReader> Make( | 
| 189 |       const ColumnDescriptor* descr, | 
| 190 |       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), | 
| 191 |       const bool read_dictionary = false); | 
| 192 |  | 
| 193 |   virtual ~RecordReader() = default; | 
| 194 |  | 
| 195 |   /// \brief Attempt to read indicated number of records from column chunk | 
| 196 |   /// \return number of records read | 
| 197 |   virtual int64_t ReadRecords(int64_t num_records) = 0; | 
| 198 |  | 
| 199 |   /// \brief Pre-allocate space for data. Results in better flat read performance | 
| 200 |   virtual void Reserve(int64_t num_values) = 0; | 
| 201 |  | 
| 202 |   /// \brief Clear consumed values and repetition/definition levels as the | 
| 203 |   /// result of calling ReadRecords | 
| 204 |   virtual void Reset() = 0; | 
| 205 |  | 
| 206 |   /// \brief Transfer filled values buffer to caller. A new one will be | 
| 207 |   /// allocated in subsequent ReadRecords calls | 
| 208 |   virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0; | 
| 209 |  | 
| 210 |   /// \brief Transfer filled validity bitmap buffer to caller. A new one will | 
| 211 |   /// be allocated in subsequent ReadRecords calls | 
| 212 |   virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid() = 0; | 
| 213 |  | 
| 214 |   /// \brief Return true if the record reader has more internal data yet to | 
| 215 |   /// process | 
| 216 |   virtual bool HasMoreData() const = 0; | 
| 217 |  | 
| 218 |   /// \brief Advance record reader to the next row group | 
| 219 |   /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader | 
| 220 |   virtual void (std::unique_ptr<PageReader> reader) = 0; | 
| 221 |  | 
| 222 |   virtual void DebugPrintState() = 0; | 
| 223 |  | 
| 224 |   /// \brief Decoded definition levels | 
| 225 |   int16_t* def_levels() const { | 
| 226 |     return reinterpret_cast<int16_t*>(def_levels_->mutable_data()); | 
| 227 |   } | 
| 228 |  | 
| 229 |   /// \brief Decoded repetition levels | 
| 230 |   int16_t* rep_levels() const { | 
| 231 |     return reinterpret_cast<int16_t*>(rep_levels_->mutable_data()); | 
| 232 |   } | 
| 233 |  | 
| 234 |   /// \brief Decoded values, including nulls, if any | 
| 235 |   uint8_t* values() const { return values_->mutable_data(); } | 
| 236 |  | 
| 237 |   /// \brief Number of values written including nulls (if any) | 
| 238 |   int64_t values_written() const { return values_written_; } | 
| 239 |  | 
| 240 |   /// \brief Number of definition / repetition levels (from those that have | 
| 241 |   /// been decoded) that have been consumed inside the reader. | 
| 242 |   int64_t levels_position() const { return levels_position_; } | 
| 243 |  | 
| 244 |   /// \brief Number of definition / repetition levels that have been written | 
| 245 |   /// internally in the reader | 
| 246 |   int64_t levels_written() const { return levels_written_; } | 
| 247 |  | 
| 248 |   /// \brief Number of nulls in the leaf | 
| 249 |   int64_t null_count() const { return null_count_; } | 
| 250 |  | 
| 251 |   /// \brief True if the leaf values are nullable | 
| 252 |   bool nullable_values() const { return nullable_values_; } | 
| 253 |  | 
| 254 |   /// \brief True if reading directly as Arrow dictionary-encoded | 
| 255 |   bool read_dictionary() const { return read_dictionary_; } | 
| 256 |  | 
| 257 |  protected: | 
| 258 |   bool nullable_values_; | 
| 259 |  | 
| 260 |   bool at_record_start_; | 
| 261 |   int64_t records_read_; | 
| 262 |  | 
| 263 |   int64_t values_written_; | 
| 264 |   int64_t values_capacity_; | 
| 265 |   int64_t null_count_; | 
| 266 |  | 
| 267 |   int64_t levels_written_; | 
| 268 |   int64_t levels_position_; | 
| 269 |   int64_t levels_capacity_; | 
| 270 |  | 
| 271 |   std::shared_ptr<::arrow::ResizableBuffer> values_; | 
| 272 |   // In the case of false, don't allocate the values buffer (when we directly read into | 
| 273 |   // builder classes). | 
| 274 |   bool uses_values_; | 
| 275 |  | 
| 276 |   std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; | 
| 277 |   std::shared_ptr<::arrow::ResizableBuffer> def_levels_; | 
| 278 |   std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; | 
| 279 |  | 
| 280 |   bool read_dictionary_ = false; | 
| 281 | }; | 
| 282 |  | 
| 283 | class BinaryRecordReader : virtual public RecordReader { | 
| 284 |  public: | 
| 285 |   virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0; | 
| 286 | }; | 
| 287 |  | 
| 288 | /// \brief Read records directly to dictionary-encoded Arrow form (int32 | 
| 289 | /// indices). Only valid for BYTE_ARRAY columns | 
| 290 | class DictionaryRecordReader : virtual public RecordReader { | 
| 291 |  public: | 
| 292 |   virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0; | 
| 293 | }; | 
| 294 |  | 
| 295 | static inline void DefinitionLevelsToBitmap( | 
| 296 |     const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, | 
| 297 |     const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, | 
| 298 |     uint8_t* valid_bits, int64_t valid_bits_offset) { | 
| 299 |   // We assume here that valid_bits is large enough to accommodate the | 
| 300 |   // additional definition levels and the ones that have already been written | 
| 301 |   ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset, | 
| 302 |                                                     num_def_levels); | 
| 303 |  | 
| 304 |   // TODO(itaiin): As an interim solution we are splitting the code path here | 
| 305 |   // between repeated+flat column reads, and non-repeated+nested reads. | 
| 306 |   // Those paths need to be merged in the future | 
| 307 |   for (int i = 0; i < num_def_levels; ++i) { | 
| 308 |     if (def_levels[i] == max_definition_level) { | 
| 309 |       valid_bits_writer.Set(); | 
| 310 |     } else if (max_repetition_level > 0) { | 
| 311 |       // repetition+flat case | 
| 312 |       if (def_levels[i] == (max_definition_level - 1)) { | 
| 313 |         valid_bits_writer.Clear(); | 
| 314 |         *null_count += 1; | 
| 315 |       } else { | 
| 316 |         continue; | 
| 317 |       } | 
| 318 |     } else { | 
| 319 |       // non-repeated+nested case | 
| 320 |       if (def_levels[i] < max_definition_level) { | 
| 321 |         valid_bits_writer.Clear(); | 
| 322 |         *null_count += 1; | 
| 323 |       } else { | 
| 324 |         throw ParquetException("definition level exceeds maximum" ); | 
| 325 |       } | 
| 326 |     } | 
| 327 |  | 
| 328 |     valid_bits_writer.Next(); | 
| 329 |   } | 
| 330 |   valid_bits_writer.Finish(); | 
| 331 |   *values_read = valid_bits_writer.position(); | 
| 332 | } | 
| 333 |  | 
| 334 | }  // namespace internal | 
| 335 |  | 
| 336 | namespace internal { | 
| 337 |  | 
| 338 | // TODO(itaiin): another code path split to merge when the general case is done | 
| 339 | static inline bool HasSpacedValues(const ColumnDescriptor* descr) { | 
| 340 |   if (descr->max_repetition_level() > 0) { | 
| 341 |     // repeated+flat case | 
| 342 |     return !descr->schema_node()->is_required(); | 
| 343 |   } else { | 
| 344 |     // non-repeated+nested case | 
| 345 |     // Find if a node forces nulls in the lowest level along the hierarchy | 
| 346 |     const schema::Node* node = descr->schema_node().get(); | 
| 347 |     while (node) { | 
| 348 |       if (node->is_optional()) { | 
| 349 |         return true; | 
| 350 |       } | 
| 351 |       node = node->parent(); | 
| 352 |     } | 
| 353 |     return false; | 
| 354 |   } | 
| 355 | } | 
| 356 |  | 
| 357 | }  // namespace internal | 
| 358 |  | 
| 359 | using BoolReader = TypedColumnReader<BooleanType>; | 
| 360 | using Int32Reader = TypedColumnReader<Int32Type>; | 
| 361 | using Int64Reader = TypedColumnReader<Int64Type>; | 
| 362 | using Int96Reader = TypedColumnReader<Int96Type>; | 
| 363 | using FloatReader = TypedColumnReader<FloatType>; | 
| 364 | using DoubleReader = TypedColumnReader<DoubleType>; | 
| 365 | using ByteArrayReader = TypedColumnReader<ByteArrayType>; | 
| 366 | using FixedLenByteArrayReader = TypedColumnReader<FLBAType>; | 
| 367 |  | 
| 368 | }  // namespace parquet | 
| 369 |  |